diff --git a/eventlet/hubs/hub.py b/eventlet/hubs/hub.py index a3d2cc6..bacfd69 100644 --- a/eventlet/hubs/hub.py +++ b/eventlet/hubs/hub.py @@ -25,8 +25,6 @@ import time from eventlet.support import greenlets as greenlet from eventlet.timer import Timer, LocalTimer -_g_debug = True - class FdListener(object): def __init__(self, evtype, fileno, cb): self.evtype = evtype @@ -37,6 +35,18 @@ class FdListener(object): def __repr__(self): return "FdListener(%r, %r, %r)" % (self.evtype, self.fileno, self.cb) __str__ = __repr__ + + +# in debug mode, track the call site that created the listener +class DebugListener(FdListener): + def __init__(self, evtype, fileno, cb): + import traceback + self.where_called = traceback.format_stack() + super(DebugListener, self).__init__(evtype, fileno, cb) + def __repr__(self): + return "DebugListener(%r, %r, %r)\n%sEndDebugFdListener" % (self.evtype, self.fileno, self.cb, ''.join(self.where_called)) + __str__ = __repr__ + class BaseHub(object): """ Base hub class for easing the implementation of subclasses that are @@ -62,6 +72,7 @@ class BaseHub(object): 'after_waiting': [], 'exit': [], } + self.lclass = FdListener def add(self, evtype, fileno, cb): """ Signals an intent to or write a particular file descriptor. @@ -73,7 +84,7 @@ class BaseHub(object): The *cb* argument is the callback which will be called when the file is ready for reading/writing. """ - listener = FdListener(evtype, fileno, cb) + listener = self.lclass(evtype, fileno, cb) self.listeners[evtype].setdefault(fileno, []).append(listener) return listener @@ -300,8 +311,13 @@ class BaseHub(object): def get_timers_count(hub): return max(len(x) for x in [hub.timers, hub.next_timers]) - def describe_listeners(self): - import pprint - return pprint.pformat(self.listeners) + def _setdebug(self, value): + if value: + self.lclass = DebugListener + else: + self.lclass = FdListener - + def _getdebug(self): + return self.lclass == DebugListener + + debug = property(_getdebug, _setdebug) \ No newline at end of file diff --git a/eventlet/tpool.py b/eventlet/tpool.py index 5acf365..217ac66 100644 --- a/eventlet/tpool.py +++ b/eventlet/tpool.py @@ -22,12 +22,7 @@ from eventlet import api, coros, greenio QUIET=False -_rpipe, _wpipe = os.pipe() -_rfile = os.fdopen(_rpipe,"r",0) -## Work whether or not wrap_pipe_with_coroutine_pipe was called -if not isinstance(_rfile, greenio.GreenPipe): - _rfile = greenio.GreenPipe(_rfile) - +_rpipe = _wpipe = _rfile = None def _signal_t2e(): from eventlet import util @@ -39,7 +34,10 @@ _rspq = Queue(maxsize=-1) def tpool_trampoline(): global _reqq, _rspq while(True): - _c = _rfile.recv(1) + try: + _c = _rfile.recv(1) + except ValueError: + break # will be raised when pipe is closed assert(_c != "") while not _rspq.empty(): try: @@ -95,6 +93,7 @@ def execute(meth,*args, **kwargs): """Execute method in a thread, blocking the current coroutine until the method completes. """ + setup() e = esend(meth,*args,**kwargs) rv = erecv(e) return rv @@ -166,21 +165,35 @@ class Proxy(object): _nthreads = int(os.environ.get('EVENTLET_THREADPOOL_SIZE', 20)) _threads = {} +_coro = None +_setup_already = False def setup(): - global _threads + if _setup_already: + return + else: + _setup_already = True + global _rpipe, _wpipe, _rfile, _threads, _coro, _setup_already + _rpipe, _wpipe = os.pipe() + _rfile = os.fdopen(_rpipe,"r",0) + ## Work whether or not wrap_pipe_with_coroutine_pipe was called + if not isinstance(_rfile, greenio.GreenPipe): + _rfile = greenio.GreenPipe(_rfile) + for i in range(0,_nthreads): _threads[i] = threading.Thread(target=tworker) _threads[i].setDaemon(True) _threads[i].start() - api.spawn(tpool_trampoline) - -setup() - + _coro = api.spawn(tpool_trampoline) def killall(): + global _setup_already + if not _setup_already: + return for i in _threads: _reqq.put(None) for thr in _threads.values(): thr.join() - + _rfile.close() + api.kill(_coro) + _setup_already = False \ No newline at end of file diff --git a/tests/api_test.py b/tests/api_test.py index 5ba8248..ddf2fd6 100644 --- a/tests/api_test.py +++ b/tests/api_test.py @@ -188,7 +188,7 @@ class TestApi(TestCase): check_hub() def test_named(self): - named_foo = api.named('api_test.Foo') + named_foo = api.named('tests.api_test.Foo') self.assertEquals( named_foo.__name__, "Foo") diff --git a/tests/db_pool_test.py b/tests/db_pool_test.py index 2a77311..e28e25e 100644 --- a/tests/db_pool_test.py +++ b/tests/db_pool_test.py @@ -465,6 +465,7 @@ class TestTpoolConnectionPool(TestDBConnectionPool): def tearDown(self): from eventlet import tpool tpool.QUIET = False + tpool.killall() super(TestTpoolConnectionPool, self).tearDown() diff --git a/tests/test__hub.py b/tests/test__hub.py index 2000d41..c3e35bc 100644 --- a/tests/test__hub.py +++ b/tests/test__hub.py @@ -41,6 +41,14 @@ class TestScheduleCall(unittest.TestCase): api.sleep(DELAY*2) assert lst == [], lst + +class TestDebug(unittest.TestCase): + def test_debug(self): + api.get_hub().debug = True + self.assert_(api.get_hub().debug) + api.get_hub().debug = False + self.assert_(not api.get_hub().debug) + class TestExceptionInMainloop(unittest.TestCase): diff --git a/tests/tpool_test.py b/tests/tpool_test.py index 35fc2a6..4a175c3 100644 --- a/tests/tpool_test.py +++ b/tests/tpool_test.py @@ -64,9 +64,11 @@ class TestTpool(TestCase): # turn off exception printing, because we'll be deliberately # triggering exceptions in our tests tpool.QUIET = True + tpool.setup() def tearDown(self): tpool.QUIET = False + tpool.killall() def test_a_buncha_stuff(self): pool = coros.CoroutinePool(max_size=10) @@ -183,7 +185,10 @@ class TestTpool(TestCase): self.assertRaises(api.TimeoutError, tpool.execute, time.sleep, 0.3) - + def test_killall(self): + tpool.killall() + tpool.setup() + @skipped def test_benchmark(self): """ Benchmark computing the amount of overhead tpool adds to function calls.""" diff --git a/tests/wsgi_test.py b/tests/wsgi_test.py index fcda3cb..1833a17 100644 --- a/tests/wsgi_test.py +++ b/tests/wsgi_test.py @@ -22,6 +22,7 @@ import cgi import os +from tests import skipped from unittest import TestCase, main from eventlet import api @@ -129,9 +130,13 @@ class TestHttpd(TestCase): def setUp(self): self.logfile = StringIO() self.site = Site() + listener = api.tcp_listener(('0.0.0.0', 12346)) self.killer = api.spawn( wsgi.server, - api.tcp_listener(('0.0.0.0', 12346)), self.site, max_size=128, log=self.logfile) + listener, + self.site, + max_size=128, + log=self.logfile) def tearDown(self): api.kill(self.killer) @@ -184,7 +189,8 @@ class TestHttpd(TestCase): self.assertRaises(ConnectionClosed, read_http, sock) fd.close() - def skip_test_005_run_apachebench(self): + @skipped + def test_005_run_apachebench(self): url = 'http://localhost:12346/' # ab is apachebench out = processes.Process(find_command('ab'), @@ -387,6 +393,7 @@ class TestHttpd(TestCase): headerlines = fd.readuntil('\r\n\r\n').splitlines() self.assertEquals(1, len([l for l in headerlines if l.lower().startswith('content-length')])) + if __name__ == '__main__': main()