Fold Runloop into libeventhub.Hub.

Remove hub.runloop.etc from other library modules.
This commit is contained in:
nat
2008-03-17 00:51:25 -04:00
parent aec9f79fc2
commit 9a0bfd7045
7 changed files with 205 additions and 36 deletions

View File

@@ -33,9 +33,9 @@ def check_hub():
api.sleep(0) api.sleep(0)
assert not api.get_hub().descriptors, repr(api.get_hub().descriptors) assert not api.get_hub().descriptors, repr(api.get_hub().descriptors)
# Stop the runloop # Stop the runloop
api.get_hub().runloop.abort() api.get_hub().abort()
api.sleep(0) api.sleep(0)
assert not api.get_hub().runloop.running assert not api.get_hub().running
class TestApi(tests.TestCase): class TestApi(tests.TestCase):

View File

@@ -263,7 +263,7 @@ class CoroutinePool(pools.Pool):
sender.reset() sender.reset()
(evt, func, args, kw) = recvd (evt, func, args, kw) = recvd
self._safe_apply(evt, func, args, kw) self._safe_apply(evt, func, args, kw)
api.get_hub().runloop.cancel_timers(api.getcurrent()) api.get_hub().cancel_timers(api.getcurrent())
self.put(sender) self.put(sender)
def _safe_apply(self, evt, func, args, kw): def _safe_apply(self, evt, func, args, kw):

View File

@@ -501,7 +501,7 @@ class HttpProtocol(BaseHTTPServer.BaseHTTPRequestHandler):
body=err.body) body=err.body)
finally: finally:
# clean up any timers that might have been left around by the handling code # clean up any timers that might have been left around by the handling code
api.get_hub().runloop.cancel_timers(api.getcurrent()) api.get_hub().cancel_timers(api.getcurrent())
# throw an exception if it failed to write a body # throw an exception if it failed to write a body
if not request.response_written(): if not request.response_written():

View File

@@ -1,5 +1,5 @@
"""\ """\
@file pollhub.py @file libeventhub.py
@author Bob Ippolito @author Bob Ippolito
Copyright (c) 2005-2006, Bob Ippolito Copyright (c) 2005-2006, Bob Ippolito
@@ -27,10 +27,10 @@ import sys
import socket import socket
import errno import errno
import traceback import traceback
from time import sleep import time
from eventlet import greenlib from eventlet import greenlib
from eventlet.runloop import RunLoop, Timer from eventlet.timer import Timer
import greenlet import greenlet
@@ -47,25 +47,37 @@ import event
class Hub(object): class Hub(object):
def __init__(self): SYSTEM_EXCEPTIONS = (KeyboardInterrupt, SystemExit)
self.runloop = RunLoop(self.wait)
def __init__(self, clock=time.time):
self.clock = clock
self.readers = {} self.readers = {}
self.writers = {} self.writers = {}
self.greenlet = None self.greenlet = None
self.stopping = False
self.running = False
self.timers = []
self.timers_by_greenlet = {}
self.next_timers = []
self.observers = {}
self.observer_modes = {
'entry': [],
'before_timers': [],
'before_waiting': [],
'after_waiting': [],
'exit': [],
}
event.init() event.init()
def stop(self): def stop(self):
self.runloop.abort() self.abort()
if self.greenlet is not greenlet.getcurrent(): if self.greenlet is not greenlet.getcurrent():
self.switch() self.switch()
def schedule_call(self, *args, **kw):
return self.runloop.schedule_call(*args, **kw)
def switch(self): def switch(self):
if not self.greenlet: if not self.greenlet:
self.greenlet = greenlib.tracked_greenlet() self.greenlet = greenlib.tracked_greenlet()
args = ((self.runloop.run,),) args = ((self.run,),)
else: else:
args = () args = ()
try: try:
@@ -110,7 +122,7 @@ class Hub(object):
def wait(self, seconds=None): def wait(self, seconds=None):
if not self.readers and not self.writers: if not self.readers and not self.writers:
if seconds: if seconds:
sleep(seconds) time.sleep(seconds)
return return
timer = event.timeout(seconds, lambda: None) timer = event.timeout(seconds, lambda: None)
@@ -119,3 +131,160 @@ class Hub(object):
if status == -1: if status == -1:
raise RuntimeError("does this ever happen?") raise RuntimeError("does this ever happen?")
def default_sleep(self):
return 60.0
def sleep_until(self):
t = self.timers
if not t:
return None
return t[0][0]
def run(self):
"""Run the runloop until abort is called.
"""
if self.running:
raise RuntimeError("Already running!")
try:
self.running = True
self.stopping = False
self.fire_observers('entry')
while not self.stopping:
self.prepare_timers()
self.fire_observers('before_timers')
self.fire_timers(self.clock())
self.prepare_timers()
wakeup_when = self.sleep_until()
if wakeup_when is None:
sleep_time = self.default_sleep()
else:
sleep_time = wakeup_when - self.clock()
if sleep_time > 0:
self.fire_observers('before_waiting')
self.wait(sleep_time)
self.fire_observers('after_waiting')
else:
self.wait(0)
else:
del self.timers[:]
del self.next_timers[:]
self.fire_observers('exit')
finally:
self.running = False
self.stopping = False
def abort(self):
"""Stop the runloop. If run is executing, it will exit after completing
the next runloop iteration.
"""
if self.running:
self.stopping = True
def add_observer(self, observer, *modes):
"""Add an event observer to this runloop with the given modes.
Valid modes are:
entry: The runloop is being entered.
before_timers: Before the expired timers for this iteration are executed.
before_waiting: Before waiting for the calculated wait_time
where nothing will happen.
after_waiting: After waiting, immediately before starting the top of the
runloop again.
exit: The runloop is exiting.
If no mode is passed or mode is all, the observer will be fired for every
event type.
"""
if not modes or modes == ('all',):
modes = tuple(self.observer_modes)
self.observers[observer] = modes
for mode in modes:
self.observer_modes[mode].append(observer)
def remove_observer(self, observer):
"""Remove a previously registered observer from all event types.
"""
for mode in self.observers.pop(observer, ()):
self.observer_modes[mode].remove(observer)
def squelch_observer_exception(self, observer, exc_info):
traceback.print_exception(*exc_info)
print >>sys.stderr, "Removing observer: %r" % (observer,)
self.remove_observer(observer)
def fire_observers(self, activity):
for observer in self.observer_modes[activity]:
try:
observer(self, activity)
except self.SYSTEM_EXCEPTIONS:
raise
except:
self.squelch_observer_exception(observer, sys.exc_info())
def squelch_timer_exception(self, timer, exc_info):
traceback.print_exception(*exc_info)
print >>sys.stderr, "Timer raised: %r" % (timer,)
def _add_absolute_timer(self, when, info):
# the 0 placeholder makes it easy to bisect_right using (now, 1)
self.next_timers.append((when, 0, info))
def add_timer(self, timer):
scheduled_time = self.clock() + timer.seconds
self._add_absolute_timer(scheduled_time, timer)
current_greenlet = greenlet.getcurrent()
if current_greenlet not in self.timers_by_greenlet:
self.timers_by_greenlet[current_greenlet] = {}
self.timers_by_greenlet[current_greenlet][timer] = True
timer.greenlet = current_greenlet
return scheduled_time
def prepare_timers(self):
ins = bisect.insort_right
t = self.timers
for item in self.next_timers:
ins(t, item)
del self.next_timers[:]
def schedule_call(self, seconds, cb, *args, **kw):
"""Schedule a callable to be called after 'seconds' seconds have
elapsed.
seconds: The number of seconds to wait.
cb: The callable to call after the given time.
*args: Arguments to pass to the callable when called.
**kw: Keyword arguments to pass to the callable when called.
"""
t = Timer(seconds, cb, *args, **kw)
self.add_timer(t)
return t
def fire_timers(self, when):
t = self.timers
last = bisect.bisect_right(t, (when, 1))
i = 0
for i in xrange(last):
timer = t[i][2]
try:
try:
timer()
except self.SYSTEM_EXCEPTIONS:
raise
except:
self.squelch_timer_exception(timer, sys.exc_info())
finally:
try:
del self.timers_by_greenlet[timer.greenlet][timer]
except KeyError:
pass
del t[:last]
def cancel_timers(self, greenlet):
if greenlet not in self.timers_by_greenlet:
return
for timer in self.timers_by_greenlet[greenlet]:
if not timer.cancelled and timer.seconds:
## If timer.seconds is 0, this isn't a timer, it's
## actually eventlet's silly way of specifying whether
## a coroutine is "ready to run" or not.
timer.cancel()
print 'Runloop cancelling left-over timer %s' % timer
del self.timers_by_greenlet[greenlet]

View File

@@ -66,7 +66,7 @@ class Timer(object):
"""Schedule this timer to run in the current runloop. """Schedule this timer to run in the current runloop.
""" """
self.called = False self.called = False
self.scheduled_time = get_hub().runloop.add_timer(self) self.scheduled_time = get_hub().add_timer(self)
return self return self
def __call__(self): def __call__(self):

View File

@@ -24,7 +24,7 @@ THE SOFTWARE.
import unittest import unittest
from eventlet import api, runloop, tests, timer from eventlet import api, tests, timer
class TestTimer(tests.TestCase): class TestTimer(tests.TestCase):
mode = 'static' mode = 'static'
@@ -35,32 +35,32 @@ class TestTimer(tests.TestCase):
assert t.tpl == t2.tpl assert t.tpl == t2.tpl
assert t.called == t2.called assert t.called == t2.called
def test_cancel(self): ## def test_cancel(self):
r = runloop.RunLoop() ## r = runloop.RunLoop()
called = [] ## called = []
t = timer.Timer(0, lambda: called.append(True)) ## t = timer.Timer(0, lambda: called.append(True))
t.cancel() ## t.cancel()
r.add_timer(t) ## r.add_timer(t)
r.add_observer(lambda r, activity: r.abort(), 'after_waiting') ## r.add_observer(lambda r, activity: r.abort(), 'after_waiting')
r.run() ## r.run()
assert not called ## assert not called
assert not r.running ## assert not r.running
def test_schedule(self): def test_schedule(self):
hub = api.get_hub() hub = api.get_hub()
r = hub.runloop ## r = hub.runloop
# clean up the runloop, preventing side effects from previous tests # clean up the runloop, preventing side effects from previous tests
# on this thread # on this thread
if r.running: if hub.running:
r.abort() hub.abort()
api.sleep(0) api.sleep(0)
called = [] called = []
t = timer.Timer(0, lambda: (called.append(True), hub.runloop.abort())) t = timer.Timer(0, lambda: (called.append(True), hub.abort()))
t.schedule() t.schedule()
r.default_sleep = lambda: 0.0 hub.default_sleep = lambda: 0.0
r.run() hub.run()
assert called assert called
assert not r.running assert not hub.running
if __name__ == '__main__': if __name__ == '__main__':
unittest.main() unittest.main()

View File

@@ -82,13 +82,13 @@ class EventletReactor(posixbase.PosixReactorBase):
self.running = True self.running = True
self._stopper = api.call_after(sys.maxint / 1000.0, lambda: None) self._stopper = api.call_after(sys.maxint / 1000.0, lambda: None)
## schedule a call way in the future, and cancel it in stop? ## schedule a call way in the future, and cancel it in stop?
api.get_hub().runloop.run() api.get_hub().run()
def stop(self): def stop(self):
self._stopper.cancel() self._stopper.cancel()
posixbase.PosixReactorBase.stop(self) posixbase.PosixReactorBase.stop(self)
api.get_hub().remove_descriptor(self._readers.keys()[0]) api.get_hub().remove_descriptor(self._readers.keys()[0])
api.get_hub().runloop.abort() api.get_hub().abort()
def addReader(self, reader): def addReader(self, reader):
fileno = reader.fileno() fileno = reader.fileno()