hubs: get_default_hub() on Windows broken by kqueue; Thanks to Paul Oppenheim

Fixes https://github.com/eventlet/eventlet/issues/38
+autopep8
This commit is contained in:
Sergey Shepelev
2013-07-02 18:29:48 +04:00
parent 747b753a20
commit b87ed20e8c
3 changed files with 99 additions and 35 deletions

View File

@@ -16,12 +16,13 @@ __all__ = ["use_hub", "get_hub", "get_default_hub", "trampoline"]
threading = patcher.original('threading')
_threadlocal = threading.local()
def get_default_hub():
"""Select the default hub implementation based on what multiplexing
libraries are installed. The order that the hubs are tried is:
* twistedr
* epoll
* kqueue
* poll
* select
@@ -33,10 +34,10 @@ def get_default_hub():
"""
# pyevent hub disabled for now because it is not thread-safe
#try:
# try:
# import eventlet.hubs.pyevent
# return eventlet.hubs.pyevent
#except:
# except:
# pass
select = patcher.original('select')
@@ -91,12 +92,14 @@ def use_hub(mod=None):
mod, found = entry.load(), True
break
if not found:
mod = __import__('eventlet.hubs.' + mod, globals(), locals(), ['Hub'])
mod = __import__(
'eventlet.hubs.' + mod, globals(), locals(), ['Hub'])
if hasattr(mod, 'Hub'):
_threadlocal.Hub = mod.Hub
else:
_threadlocal.Hub = mod
def get_hub():
"""Get the current event hub singleton object.
@@ -113,6 +116,8 @@ def get_hub():
return hub
from eventlet import timeout
def trampoline(fd, read=None, write=None, timeout=None,
timeout_exc=timeout.Timeout):
"""Suspend the current coroutine until the given socket object or file
@@ -133,7 +138,8 @@ def trampoline(fd, read=None, write=None, timeout=None,
hub = get_hub()
current = greenlet.getcurrent()
assert hub.greenlet is not current, 'do not call blocking functions from the mainloop'
assert not (read and write), 'not allowed to trampoline for reading and writing'
assert not (
read and write), 'not allowed to trampoline for reading and writing'
try:
fileno = fd.fileno()
except AttributeError:

View File

@@ -8,9 +8,15 @@ sleep = time.sleep
from eventlet.support import get_errno, clear_sys_exc_info
from eventlet.hubs.hub import BaseHub, READ, WRITE, noop
if getattr(select, 'kqueue', None) is None:
raise ImportError('No kqueue implementation found in select module')
FILTERS = {READ: select.KQ_FILTER_READ,
WRITE: select.KQ_FILTER_WRITE}
class Hub(BaseHub):
MAX_EVENTS = 100
@@ -47,7 +53,7 @@ class Hub(BaseHub):
if evtype not in events:
try:
event = select.kevent(fileno,
FILTERS.get(evtype), select.KQ_EV_ADD)
FILTERS.get(evtype), select.KQ_EV_ADD)
self._control([event], 0, 0)
events[evtype] = event
except ValueError:

View File

@@ -2,6 +2,7 @@ from __future__ import with_statement
import sys
from tests import LimitedTestCase, main, skip_with_pyevent, skip_if_no_itimer, skip_unless
from tests.patcher_test import ProcessBase
import time
import eventlet
from eventlet import hubs
@@ -11,11 +12,15 @@ from eventlet.semaphore import Semaphore
from eventlet.support import greenlets
DELAY = 0.001
def noop():
pass
class TestTimerCleanup(LimitedTestCase):
TEST_TIMEOUT = 2
@skip_with_pyevent
def test_cancel_immediate(self):
hub = hubs.get_hub()
@@ -25,12 +30,11 @@ class TestTimerCleanup(LimitedTestCase):
t = hubs.get_hub().schedule_call_global(60, noop)
t.cancel()
self.assert_less_than_equal(hub.timers_canceled,
hub.get_timers_count() + 1)
hub.get_timers_count() + 1)
# there should be fewer than 1000 new timers and canceled
self.assert_less_than_equal(hub.get_timers_count(), 1000 + stimers)
self.assert_less_than_equal(hub.timers_canceled, 1000)
@skip_with_pyevent
def test_cancel_accumulated(self):
hub = hubs.get_hub()
@@ -40,10 +44,10 @@ class TestTimerCleanup(LimitedTestCase):
t = hubs.get_hub().schedule_call_global(60, noop)
eventlet.sleep()
self.assert_less_than_equal(hub.timers_canceled,
hub.get_timers_count() + 1)
hub.get_timers_count() + 1)
t.cancel()
self.assert_less_than_equal(hub.timers_canceled,
hub.get_timers_count() + 1, hub.timers)
hub.get_timers_count() + 1, hub.timers)
# there should be fewer than 1000 new timers and canceled
self.assert_less_than_equal(hub.get_timers_count(), 1000 + stimers)
self.assert_less_than_equal(hub.timers_canceled, 1000)
@@ -81,31 +85,33 @@ class TestTimerCleanup(LimitedTestCase):
class TestScheduleCall(LimitedTestCase):
def test_local(self):
lst = [1]
eventlet.spawn(hubs.get_hub().schedule_call_local, DELAY, lst.pop)
eventlet.sleep(0)
eventlet.sleep(DELAY*2)
eventlet.sleep(DELAY * 2)
assert lst == [1], lst
def test_global(self):
lst = [1]
eventlet.spawn(hubs.get_hub().schedule_call_global, DELAY, lst.pop)
eventlet.sleep(0)
eventlet.sleep(DELAY*2)
eventlet.sleep(DELAY * 2)
assert lst == [], lst
def test_ordering(self):
lst = []
hubs.get_hub().schedule_call_global(DELAY*2, lst.append, 3)
hubs.get_hub().schedule_call_global(DELAY * 2, lst.append, 3)
hubs.get_hub().schedule_call_global(DELAY, lst.append, 1)
hubs.get_hub().schedule_call_global(DELAY, lst.append, 2)
while len(lst) < 3:
eventlet.sleep(DELAY)
self.assertEquals(lst, [1,2,3])
self.assertEquals(lst, [1, 2, 3])
class TestDebug(LimitedTestCase):
def test_debug_listeners(self):
hubs.get_hub().set_debug_listeners(True)
hubs.get_hub().set_debug_listeners(False)
@@ -116,16 +122,20 @@ class TestDebug(LimitedTestCase):
class TestExceptionInMainloop(LimitedTestCase):
def test_sleep(self):
# even if there was an error in the mainloop, the hub should continue to work
# even if there was an error in the mainloop, the hub should continue
# to work
start = time.time()
eventlet.sleep(DELAY)
delay = time.time() - start
assert delay >= DELAY*0.9, 'sleep returned after %s seconds (was scheduled for %s)' % (delay, DELAY)
assert delay >= DELAY * \
0.9, 'sleep returned after %s seconds (was scheduled for %s)' % (
delay, DELAY)
def fail():
1//0
1 // 0
hubs.get_hub().schedule_call_global(0, fail)
@@ -133,10 +143,13 @@ class TestExceptionInMainloop(LimitedTestCase):
eventlet.sleep(DELAY)
delay = time.time() - start
assert delay >= DELAY*0.9, 'sleep returned after %s seconds (was scheduled for %s)' % (delay, DELAY)
assert delay >= DELAY * \
0.9, 'sleep returned after %s seconds (was scheduled for %s)' % (
delay, DELAY)
class TestExceptionInGreenthread(LimitedTestCase):
@skip_unless(greenlets.preserves_excinfo)
def test_exceptionpreservation(self):
# events for controlling execution order
@@ -193,6 +206,7 @@ class TestExceptionInGreenthread(LimitedTestCase):
class TestHubSelection(LimitedTestCase):
def test_explicit_hub(self):
if getattr(hubs.get_hub(), 'uses_twisted_reactor', None):
# doesn't work with twisted
@@ -207,6 +221,7 @@ class TestHubSelection(LimitedTestCase):
class TestHubBlockingDetector(LimitedTestCase):
TEST_TIMEOUT = 10
@skip_with_pyevent
def test_block_detect(self):
def look_im_blocking():
@@ -233,7 +248,8 @@ class TestHubBlockingDetector(LimitedTestCase):
class TestSuspend(LimitedTestCase):
TEST_TIMEOUT=3
TEST_TIMEOUT = 3
def test_suspend_doesnt_crash(self):
import errno
import os
@@ -258,9 +274,9 @@ except eventlet.Timeout:
new_env['PYTHONPATH'] = python_path
p = subprocess.Popen([sys.executable,
os.path.join(self.tempdir, filename)],
stdout=subprocess.PIPE, stderr=subprocess.STDOUT, env=new_env)
stdout=subprocess.PIPE, stderr=subprocess.STDOUT, env=new_env)
eventlet.sleep(0.4) # wait for process to hit accept
os.kill(p.pid, signal.SIGSTOP) # suspend and resume to generate EINTR
os.kill(p.pid, signal.SIGSTOP) # suspend and resume to generate EINTR
os.kill(p.pid, signal.SIGCONT)
output, _ = p.communicate()
lines = [l for l in output.split("\n") if l]
@@ -269,6 +285,7 @@ except eventlet.Timeout:
class TestBadFilenos(LimitedTestCase):
@skip_with_pyevent
def test_repeated_selects(self):
from eventlet.green import select
@@ -276,8 +293,8 @@ class TestBadFilenos(LimitedTestCase):
self.assertRaises(ValueError, select.select, [-1], [], [])
from tests.patcher_test import ProcessBase
class TestFork(ProcessBase):
@skip_with_pyevent
def test_fork(self):
new_mod = """
@@ -312,7 +329,7 @@ else:
class TestDeadRunLoop(LimitedTestCase):
TEST_TIMEOUT=2
TEST_TIMEOUT = 2
class CustomException(Exception):
pass
@@ -330,7 +347,7 @@ class TestDeadRunLoop(LimitedTestCase):
eventlet.sleep(0) # let dummyproc run
assert hub.greenlet.parent == eventlet.greenthread.getcurrent()
self.assertRaises(KeyboardInterrupt, hub.greenlet.throw,
KeyboardInterrupt())
KeyboardInterrupt())
# kill dummyproc, this schedules a timer to return execution to
# this greenlet before throwing an exception in dummyproc.
@@ -355,7 +372,7 @@ class TestDeadRunLoop(LimitedTestCase):
g = eventlet.spawn(dummyproc)
assert hub.greenlet.parent == eventlet.greenthread.getcurrent()
self.assertRaises(KeyboardInterrupt, hub.greenlet.throw,
KeyboardInterrupt())
KeyboardInterrupt())
assert not g.dead # check dummyproc hasn't completed
with eventlet.Timeout(0.5, self.CustomException()):
@@ -370,6 +387,41 @@ class TestDeadRunLoop(LimitedTestCase):
class Foo(object):
pass
if __name__=='__main__':
main()
class TestDefaultHub(ProcessBase):
def test_kqueue_unsupported(self):
# https://github.com/eventlet/eventlet/issues/38
# get_hub on windows broken by kqueue
module_source = r'''
# Simulate absence of kqueue even on platforms that support it.
import select
try:
del select.kqueue
except AttributeError:
pass
import __builtin__
original_import = __builtin__.__import__
def fail_import(name, *args, **kwargs):
if 'epoll' in name:
raise ImportError('disabled for test')
if 'kqueue' in name:
print('kqueue tried')
return original_import(name, *args, **kwargs)
__builtin__.__import__ = fail_import
import eventlet.hubs
eventlet.hubs.get_default_hub()
print('ok')
'''
self.write_to_tempfile('newmod', module_source)
output, _ = self.launch_subprocess('newmod.py')
self.assertEqual(output, 'kqueue tried\nok\n')
if __name__ == '__main__':
main()