Merge changes from Donovan and Ryan

This commit is contained in:
nat
2008-03-19 18:02:11 -04:00
6 changed files with 65 additions and 27 deletions

View File

@@ -237,19 +237,13 @@ def get_default_hub():
except ImportError:
pass
try:
import eventlet.hubs.kqueue
return eventlet.hubs.kqueue
except ImportError:
pass
import select
if hasattr(select, 'poll'):
import eventlet.hubs.poll
return eventlet.hubs.poll
else:
import eventlet.hubs.select
return eventlet.hubs.select
import eventlet.hubs.selecthub
return eventlet.hubs.selecthub
def use_hub(mod=None):

View File

@@ -34,6 +34,9 @@ from eventlet import greenlib
from eventlet.timer import Timer
class BaseHub(object):
""" Base hub class for easing the implementation of subclasses that are
specific to a particular underlying event architecture. """
SYSTEM_EXCEPTIONS = (KeyboardInterrupt, SystemExit)
def __init__(self, clock=time.time):
@@ -58,6 +61,20 @@ class BaseHub(object):
}
def add_descriptor(self, fileno, read=None, write=None, exc=None):
""" Signals an intent to read/write from a particular file descriptor.
The fileno argument is the file number of the file of interest. The other
arguments are either callbacks or None. If there is a callback for read
or write, the hub sets things up so that when the file descriptor is
ready to be read or written, the callback is called.
The exc callback is called when the socket represented by the file
descriptor is closed. The intent is that the the exc callbacks should
only be present when either a read or write callback is also present,
so the exc callback happens instead of the respective read or write
callback.
"""
self.readers[fileno] = read or self.readers.get(fileno)
self.writers[fileno] = write or self.writers.get(fileno)
self.excs[fileno] = exc or self.excs.get(fileno)
@@ -203,16 +220,18 @@ class BaseHub(object):
def add_timer(self, timer):
scheduled_time = self.clock() + timer.seconds
self._add_absolute_timer(scheduled_time, timer)
timer.greenlet = current_greenlet
self.track_timer(timer)
return scheduled_time
def track_timer(self, timer):
current_greenlet = greenlet.getcurrent()
timer.greenlet = current_greenlet
if current_greenlet not in self.timers_by_greenlet:
self.timers_by_greenlet[current_greenlet] = {}
self.timers_by_greenlet[current_greenlet][timer] = True
def timer_canceled(self, timer):
pass
def prepare_timers(self):
ins = bisect.insort_right

View File

@@ -22,6 +22,7 @@ THE SOFTWARE.
"""
import bisect
import signal
import sys
import socket
import errno
@@ -38,14 +39,15 @@ import greenlet
#raise ImportError()
try:
import event
except ImportError:
# use rel if pyevent isn't available
# (rel prints out some annoying notice upon initialization)
# use rel if available
import rel
rel.initialize()
rel.override()
import event
except ImportError:
# don't have rel, but might still have libevent
pass
import event
class Hub(hub.BaseHub):
@@ -56,9 +58,8 @@ class Hub(hub.BaseHub):
self.interrupted = False
event.init()
# catch SIGINT
signal = event.signal(2, self.signal_received, 2)
signal.add()
sig = event.signal(signal.SIGINT, self.signal_received, signal.SIGINT)
sig.add()
def add_descriptor(self, fileno, read=None, write=None, exc=None):
@@ -83,16 +84,23 @@ class Hub(hub.BaseHub):
self.excs.pop(fileno, None)
def signal_received(self, signal):
# can only set this flag here because the pyevent callback mechanism
# swallows exceptions raised here, so we have to raise in the 'main'
# greenlet to kill the program
self.interrupted = True
def wait(self, seconds=None):
timer = event.timeout(seconds, lambda: None)
# this timeout will cause us to return from the dispatch() call
# when we want to
def abc():
pass
timer = event.timeout(seconds, abc)
timer.add()
status = event.loop()
if status == -1:
raise RuntimeError("does this ever happen?")
status = event.dispatch()
# we are explicitly ignoring the status because in our experience it's
# harmless and there's nothing meaningful we could do with it anyway
timer.delete()
if self.interrupted:
@@ -100,6 +108,16 @@ class Hub(hub.BaseHub):
raise KeyboardInterrupt()
def add_timer(self, timer):
event.timeout(timer.seconds, timer).add()
# eventtimer is the pyevent object representing the timer
eventtimer = event.timeout(timer.seconds, timer)
timer.impltimer = eventtimer
eventtimer.add()
self.track_timer(timer)
def timer_canceled(self, timer):
""" Cancels the underlying libevent timer. """
try:
timer.impltimer.delete()
except AttributeError:
pass

View File

@@ -1,5 +1,5 @@
"""\
@file select.py
@file selecthub.py
Copyright (c) 2005-2006, Bob Ippolito
Copyright (c) 2007, Linden Research, Inc.

View File

@@ -29,7 +29,7 @@ useful for debugging leaking timers, to find out where the timer was set up. """
_g_debug = False
class Timer(object):
__slots__ = ['seconds', 'tpl', 'called', 'cancelled', 'scheduled_time', 'greenlet', 'traceback']
__slots__ = ['seconds', 'tpl', 'called', 'cancelled', 'scheduled_time', 'greenlet', 'traceback', 'impltimer']
def __init__(self, seconds, cb, *args, **kw):
"""Create a timer.
seconds: The minimum number of seconds to wait before calling
@@ -81,3 +81,4 @@ class Timer(object):
"""
self.cancelled = True
self.called = True
get_hub().timer_canceled(self)

View File

@@ -90,8 +90,12 @@ def wrap_ssl(sock, certificate=None, private_key=None):
connection.set_connect_state()
return greenio.GreenSocket(connection)
socket_already_wrapped = False
def wrap_socket_with_coroutine_socket():
global socket_already_wrapped
if socket_already_wrapped:
return
def new_socket(*args, **kw):
from eventlet import greenio
s = __original_socket__(*args, **kw)
@@ -100,6 +104,8 @@ def wrap_socket_with_coroutine_socket():
socket.socket = new_socket
socket.ssl = wrap_ssl
socket_already_wrapped = True
def socket_bind_and_listen(descriptor, addr=('', 0), backlog=50):