diff --git a/eventlet/hubs/__init__.py b/eventlet/hubs/__init__.py index bdc6f3c..9b4c2b7 100644 --- a/eventlet/hubs/__init__.py +++ b/eventlet/hubs/__init__.py @@ -16,12 +16,13 @@ __all__ = ["use_hub", "get_hub", "get_default_hub", "trampoline"] threading = patcher.original('threading') _threadlocal = threading.local() + def get_default_hub(): """Select the default hub implementation based on what multiplexing libraries are installed. The order that the hubs are tried is: - * twistedr * epoll + * kqueue * poll * select @@ -33,10 +34,10 @@ def get_default_hub(): """ # pyevent hub disabled for now because it is not thread-safe - #try: + # try: # import eventlet.hubs.pyevent # return eventlet.hubs.pyevent - #except: + # except: # pass select = patcher.original('select') @@ -91,12 +92,14 @@ def use_hub(mod=None): mod, found = entry.load(), True break if not found: - mod = __import__('eventlet.hubs.' + mod, globals(), locals(), ['Hub']) + mod = __import__( + 'eventlet.hubs.' + mod, globals(), locals(), ['Hub']) if hasattr(mod, 'Hub'): _threadlocal.Hub = mod.Hub else: _threadlocal.Hub = mod + def get_hub(): """Get the current event hub singleton object. @@ -113,6 +116,8 @@ def get_hub(): return hub from eventlet import timeout + + def trampoline(fd, read=None, write=None, timeout=None, timeout_exc=timeout.Timeout): """Suspend the current coroutine until the given socket object or file @@ -133,7 +138,8 @@ def trampoline(fd, read=None, write=None, timeout=None, hub = get_hub() current = greenlet.getcurrent() assert hub.greenlet is not current, 'do not call blocking functions from the mainloop' - assert not (read and write), 'not allowed to trampoline for reading and writing' + assert not ( + read and write), 'not allowed to trampoline for reading and writing' try: fileno = fd.fileno() except AttributeError: diff --git a/eventlet/hubs/kqueue.py b/eventlet/hubs/kqueue.py index b24df83..9fbc9e2 100644 --- a/eventlet/hubs/kqueue.py +++ b/eventlet/hubs/kqueue.py @@ -8,9 +8,15 @@ sleep = time.sleep from eventlet.support import get_errno, clear_sys_exc_info from eventlet.hubs.hub import BaseHub, READ, WRITE, noop + +if getattr(select, 'kqueue', None) is None: + raise ImportError('No kqueue implementation found in select module') + + FILTERS = {READ: select.KQ_FILTER_READ, WRITE: select.KQ_FILTER_WRITE} + class Hub(BaseHub): MAX_EVENTS = 100 @@ -47,7 +53,7 @@ class Hub(BaseHub): if evtype not in events: try: event = select.kevent(fileno, - FILTERS.get(evtype), select.KQ_EV_ADD) + FILTERS.get(evtype), select.KQ_EV_ADD) self._control([event], 0, 0) events[evtype] = event except ValueError: diff --git a/tests/hub_test.py b/tests/hub_test.py index 6fd1e3b..d062304 100644 --- a/tests/hub_test.py +++ b/tests/hub_test.py @@ -2,6 +2,7 @@ from __future__ import with_statement import sys from tests import LimitedTestCase, main, skip_with_pyevent, skip_if_no_itimer, skip_unless +from tests.patcher_test import ProcessBase import time import eventlet from eventlet import hubs @@ -11,11 +12,15 @@ from eventlet.semaphore import Semaphore from eventlet.support import greenlets DELAY = 0.001 + + def noop(): pass + class TestTimerCleanup(LimitedTestCase): TEST_TIMEOUT = 2 + @skip_with_pyevent def test_cancel_immediate(self): hub = hubs.get_hub() @@ -25,12 +30,11 @@ class TestTimerCleanup(LimitedTestCase): t = hubs.get_hub().schedule_call_global(60, noop) t.cancel() self.assert_less_than_equal(hub.timers_canceled, - hub.get_timers_count() + 1) + hub.get_timers_count() + 1) # there should be fewer than 1000 new timers and canceled self.assert_less_than_equal(hub.get_timers_count(), 1000 + stimers) self.assert_less_than_equal(hub.timers_canceled, 1000) - @skip_with_pyevent def test_cancel_accumulated(self): hub = hubs.get_hub() @@ -40,10 +44,10 @@ class TestTimerCleanup(LimitedTestCase): t = hubs.get_hub().schedule_call_global(60, noop) eventlet.sleep() self.assert_less_than_equal(hub.timers_canceled, - hub.get_timers_count() + 1) + hub.get_timers_count() + 1) t.cancel() self.assert_less_than_equal(hub.timers_canceled, - hub.get_timers_count() + 1, hub.timers) + hub.get_timers_count() + 1, hub.timers) # there should be fewer than 1000 new timers and canceled self.assert_less_than_equal(hub.get_timers_count(), 1000 + stimers) self.assert_less_than_equal(hub.timers_canceled, 1000) @@ -78,34 +82,36 @@ class TestTimerCleanup(LimitedTestCase): self.assert_less_than_equal(hub.timers_canceled, hub.get_timers_count()) eventlet.sleep() - + class TestScheduleCall(LimitedTestCase): + def test_local(self): lst = [1] eventlet.spawn(hubs.get_hub().schedule_call_local, DELAY, lst.pop) eventlet.sleep(0) - eventlet.sleep(DELAY*2) + eventlet.sleep(DELAY * 2) assert lst == [1], lst def test_global(self): lst = [1] eventlet.spawn(hubs.get_hub().schedule_call_global, DELAY, lst.pop) eventlet.sleep(0) - eventlet.sleep(DELAY*2) + eventlet.sleep(DELAY * 2) assert lst == [], lst - + def test_ordering(self): lst = [] - hubs.get_hub().schedule_call_global(DELAY*2, lst.append, 3) + hubs.get_hub().schedule_call_global(DELAY * 2, lst.append, 3) hubs.get_hub().schedule_call_global(DELAY, lst.append, 1) hubs.get_hub().schedule_call_global(DELAY, lst.append, 2) while len(lst) < 3: eventlet.sleep(DELAY) - self.assertEquals(lst, [1,2,3]) + self.assertEquals(lst, [1, 2, 3]) + - class TestDebug(LimitedTestCase): + def test_debug_listeners(self): hubs.get_hub().set_debug_listeners(True) hubs.get_hub().set_debug_listeners(False) @@ -113,19 +119,23 @@ class TestDebug(LimitedTestCase): def test_timer_exceptions(self): hubs.get_hub().set_timer_exceptions(True) hubs.get_hub().set_timer_exceptions(False) - + class TestExceptionInMainloop(LimitedTestCase): + def test_sleep(self): - # even if there was an error in the mainloop, the hub should continue to work + # even if there was an error in the mainloop, the hub should continue + # to work start = time.time() eventlet.sleep(DELAY) delay = time.time() - start - assert delay >= DELAY*0.9, 'sleep returned after %s seconds (was scheduled for %s)' % (delay, DELAY) + assert delay >= DELAY * \ + 0.9, 'sleep returned after %s seconds (was scheduled for %s)' % ( + delay, DELAY) def fail(): - 1//0 + 1 // 0 hubs.get_hub().schedule_call_global(0, fail) @@ -133,10 +143,13 @@ class TestExceptionInMainloop(LimitedTestCase): eventlet.sleep(DELAY) delay = time.time() - start - assert delay >= DELAY*0.9, 'sleep returned after %s seconds (was scheduled for %s)' % (delay, DELAY) + assert delay >= DELAY * \ + 0.9, 'sleep returned after %s seconds (was scheduled for %s)' % ( + delay, DELAY) class TestExceptionInGreenthread(LimitedTestCase): + @skip_unless(greenlets.preserves_excinfo) def test_exceptionpreservation(self): # events for controlling execution order @@ -193,6 +206,7 @@ class TestExceptionInGreenthread(LimitedTestCase): class TestHubSelection(LimitedTestCase): + def test_explicit_hub(self): if getattr(hubs.get_hub(), 'uses_twisted_reactor', None): # doesn't work with twisted @@ -207,6 +221,7 @@ class TestHubSelection(LimitedTestCase): class TestHubBlockingDetector(LimitedTestCase): TEST_TIMEOUT = 10 + @skip_with_pyevent def test_block_detect(self): def look_im_blocking(): @@ -230,10 +245,11 @@ class TestHubBlockingDetector(LimitedTestCase): gt = eventlet.spawn(look_im_blocking) self.assertRaises(RuntimeError, gt.wait) debug.hub_blocking_detection(False) - + class TestSuspend(LimitedTestCase): - TEST_TIMEOUT=3 + TEST_TIMEOUT = 3 + def test_suspend_doesnt_crash(self): import errno import os @@ -256,11 +272,11 @@ except eventlet.Timeout: python_path = os.pathsep.join(sys.path + [self.tempdir]) new_env = os.environ.copy() new_env['PYTHONPATH'] = python_path - p = subprocess.Popen([sys.executable, + p = subprocess.Popen([sys.executable, os.path.join(self.tempdir, filename)], - stdout=subprocess.PIPE, stderr=subprocess.STDOUT, env=new_env) + stdout=subprocess.PIPE, stderr=subprocess.STDOUT, env=new_env) eventlet.sleep(0.4) # wait for process to hit accept - os.kill(p.pid, signal.SIGSTOP) # suspend and resume to generate EINTR + os.kill(p.pid, signal.SIGSTOP) # suspend and resume to generate EINTR os.kill(p.pid, signal.SIGCONT) output, _ = p.communicate() lines = [l for l in output.split("\n") if l] @@ -269,15 +285,16 @@ except eventlet.Timeout: class TestBadFilenos(LimitedTestCase): + @skip_with_pyevent def test_repeated_selects(self): from eventlet.green import select self.assertRaises(ValueError, select.select, [-1], [], []) self.assertRaises(ValueError, select.select, [-1], [], []) - -from tests.patcher_test import ProcessBase + class TestFork(ProcessBase): + @skip_with_pyevent def test_fork(self): new_mod = """ @@ -297,7 +314,7 @@ if not pid: new_sock, address = server.accept() except eventlet.Timeout, t: print "accept blocked" - + else: kpid, status = os.wait() assert kpid == pid @@ -312,7 +329,7 @@ else: class TestDeadRunLoop(LimitedTestCase): - TEST_TIMEOUT=2 + TEST_TIMEOUT = 2 class CustomException(Exception): pass @@ -330,7 +347,7 @@ class TestDeadRunLoop(LimitedTestCase): eventlet.sleep(0) # let dummyproc run assert hub.greenlet.parent == eventlet.greenthread.getcurrent() self.assertRaises(KeyboardInterrupt, hub.greenlet.throw, - KeyboardInterrupt()) + KeyboardInterrupt()) # kill dummyproc, this schedules a timer to return execution to # this greenlet before throwing an exception in dummyproc. @@ -355,7 +372,7 @@ class TestDeadRunLoop(LimitedTestCase): g = eventlet.spawn(dummyproc) assert hub.greenlet.parent == eventlet.greenthread.getcurrent() self.assertRaises(KeyboardInterrupt, hub.greenlet.throw, - KeyboardInterrupt()) + KeyboardInterrupt()) assert not g.dead # check dummyproc hasn't completed with eventlet.Timeout(0.5, self.CustomException()): @@ -370,6 +387,41 @@ class TestDeadRunLoop(LimitedTestCase): class Foo(object): pass -if __name__=='__main__': - main() +class TestDefaultHub(ProcessBase): + + def test_kqueue_unsupported(self): + # https://github.com/eventlet/eventlet/issues/38 + # get_hub on windows broken by kqueue + module_source = r''' +# Simulate absence of kqueue even on platforms that support it. +import select +try: + del select.kqueue +except AttributeError: + pass + +import __builtin__ +original_import = __builtin__.__import__ + +def fail_import(name, *args, **kwargs): + if 'epoll' in name: + raise ImportError('disabled for test') + if 'kqueue' in name: + print('kqueue tried') + return original_import(name, *args, **kwargs) + +__builtin__.__import__ = fail_import + + +import eventlet.hubs +eventlet.hubs.get_default_hub() +print('ok') +''' + self.write_to_tempfile('newmod', module_source) + output, _ = self.launch_subprocess('newmod.py') + self.assertEqual(output, 'kqueue tried\nok\n') + + +if __name__ == '__main__': + main()