Added cool little debug mode to Hub which makes it easier to see why a particular fileno is in the hub by recording the traceback of when the fileno was added. Enable this mode by doing api.get_hub().debug = True. Tweaked tpool so that it only starts up its thread pool when you actually call execute(). I think this is not a performance hit and it will make it somewhat nicer to use. Some other randomc changes like more explicit package name in test_named so it passes when called from within nose.

This commit is contained in:
Ryan Williams
2009-08-24 22:35:25 -07:00
parent de77cb06b0
commit 488f4abd42
7 changed files with 74 additions and 24 deletions

View File

@@ -25,8 +25,6 @@ import time
from eventlet.support import greenlets as greenlet from eventlet.support import greenlets as greenlet
from eventlet.timer import Timer, LocalTimer from eventlet.timer import Timer, LocalTimer
_g_debug = True
class FdListener(object): class FdListener(object):
def __init__(self, evtype, fileno, cb): def __init__(self, evtype, fileno, cb):
self.evtype = evtype self.evtype = evtype
@@ -38,6 +36,18 @@ class FdListener(object):
return "FdListener(%r, %r, %r)" % (self.evtype, self.fileno, self.cb) return "FdListener(%r, %r, %r)" % (self.evtype, self.fileno, self.cb)
__str__ = __repr__ __str__ = __repr__
# in debug mode, track the call site that created the listener
class DebugListener(FdListener):
def __init__(self, evtype, fileno, cb):
import traceback
self.where_called = traceback.format_stack()
super(DebugListener, self).__init__(evtype, fileno, cb)
def __repr__(self):
return "DebugListener(%r, %r, %r)\n%sEndDebugFdListener" % (self.evtype, self.fileno, self.cb, ''.join(self.where_called))
__str__ = __repr__
class BaseHub(object): class BaseHub(object):
""" Base hub class for easing the implementation of subclasses that are """ Base hub class for easing the implementation of subclasses that are
specific to a particular underlying event architecture. """ specific to a particular underlying event architecture. """
@@ -62,6 +72,7 @@ class BaseHub(object):
'after_waiting': [], 'after_waiting': [],
'exit': [], 'exit': [],
} }
self.lclass = FdListener
def add(self, evtype, fileno, cb): def add(self, evtype, fileno, cb):
""" Signals an intent to or write a particular file descriptor. """ Signals an intent to or write a particular file descriptor.
@@ -73,7 +84,7 @@ class BaseHub(object):
The *cb* argument is the callback which will be called when the file The *cb* argument is the callback which will be called when the file
is ready for reading/writing. is ready for reading/writing.
""" """
listener = FdListener(evtype, fileno, cb) listener = self.lclass(evtype, fileno, cb)
self.listeners[evtype].setdefault(fileno, []).append(listener) self.listeners[evtype].setdefault(fileno, []).append(listener)
return listener return listener
@@ -300,8 +311,13 @@ class BaseHub(object):
def get_timers_count(hub): def get_timers_count(hub):
return max(len(x) for x in [hub.timers, hub.next_timers]) return max(len(x) for x in [hub.timers, hub.next_timers])
def describe_listeners(self): def _setdebug(self, value):
import pprint if value:
return pprint.pformat(self.listeners) self.lclass = DebugListener
else:
self.lclass = FdListener
def _getdebug(self):
return self.lclass == DebugListener
debug = property(_getdebug, _setdebug)

View File

@@ -22,12 +22,7 @@ from eventlet import api, coros, greenio
QUIET=False QUIET=False
_rpipe, _wpipe = os.pipe() _rpipe = _wpipe = _rfile = None
_rfile = os.fdopen(_rpipe,"r",0)
## Work whether or not wrap_pipe_with_coroutine_pipe was called
if not isinstance(_rfile, greenio.GreenPipe):
_rfile = greenio.GreenPipe(_rfile)
def _signal_t2e(): def _signal_t2e():
from eventlet import util from eventlet import util
@@ -39,7 +34,10 @@ _rspq = Queue(maxsize=-1)
def tpool_trampoline(): def tpool_trampoline():
global _reqq, _rspq global _reqq, _rspq
while(True): while(True):
_c = _rfile.recv(1) try:
_c = _rfile.recv(1)
except ValueError:
break # will be raised when pipe is closed
assert(_c != "") assert(_c != "")
while not _rspq.empty(): while not _rspq.empty():
try: try:
@@ -95,6 +93,7 @@ def execute(meth,*args, **kwargs):
"""Execute method in a thread, blocking the current """Execute method in a thread, blocking the current
coroutine until the method completes. coroutine until the method completes.
""" """
setup()
e = esend(meth,*args,**kwargs) e = esend(meth,*args,**kwargs)
rv = erecv(e) rv = erecv(e)
return rv return rv
@@ -166,21 +165,35 @@ class Proxy(object):
_nthreads = int(os.environ.get('EVENTLET_THREADPOOL_SIZE', 20)) _nthreads = int(os.environ.get('EVENTLET_THREADPOOL_SIZE', 20))
_threads = {} _threads = {}
_coro = None
_setup_already = False
def setup(): def setup():
global _threads if _setup_already:
return
else:
_setup_already = True
global _rpipe, _wpipe, _rfile, _threads, _coro, _setup_already
_rpipe, _wpipe = os.pipe()
_rfile = os.fdopen(_rpipe,"r",0)
## Work whether or not wrap_pipe_with_coroutine_pipe was called
if not isinstance(_rfile, greenio.GreenPipe):
_rfile = greenio.GreenPipe(_rfile)
for i in range(0,_nthreads): for i in range(0,_nthreads):
_threads[i] = threading.Thread(target=tworker) _threads[i] = threading.Thread(target=tworker)
_threads[i].setDaemon(True) _threads[i].setDaemon(True)
_threads[i].start() _threads[i].start()
api.spawn(tpool_trampoline) _coro = api.spawn(tpool_trampoline)
setup()
def killall(): def killall():
global _setup_already
if not _setup_already:
return
for i in _threads: for i in _threads:
_reqq.put(None) _reqq.put(None)
for thr in _threads.values(): for thr in _threads.values():
thr.join() thr.join()
_rfile.close()
api.kill(_coro)
_setup_already = False

View File

@@ -188,7 +188,7 @@ class TestApi(TestCase):
check_hub() check_hub()
def test_named(self): def test_named(self):
named_foo = api.named('api_test.Foo') named_foo = api.named('tests.api_test.Foo')
self.assertEquals( self.assertEquals(
named_foo.__name__, named_foo.__name__,
"Foo") "Foo")

View File

@@ -465,6 +465,7 @@ class TestTpoolConnectionPool(TestDBConnectionPool):
def tearDown(self): def tearDown(self):
from eventlet import tpool from eventlet import tpool
tpool.QUIET = False tpool.QUIET = False
tpool.killall()
super(TestTpoolConnectionPool, self).tearDown() super(TestTpoolConnectionPool, self).tearDown()

View File

@@ -42,6 +42,14 @@ class TestScheduleCall(unittest.TestCase):
assert lst == [], lst assert lst == [], lst
class TestDebug(unittest.TestCase):
def test_debug(self):
api.get_hub().debug = True
self.assert_(api.get_hub().debug)
api.get_hub().debug = False
self.assert_(not api.get_hub().debug)
class TestExceptionInMainloop(unittest.TestCase): class TestExceptionInMainloop(unittest.TestCase):
def test_sleep(self): def test_sleep(self):

View File

@@ -64,9 +64,11 @@ class TestTpool(TestCase):
# turn off exception printing, because we'll be deliberately # turn off exception printing, because we'll be deliberately
# triggering exceptions in our tests # triggering exceptions in our tests
tpool.QUIET = True tpool.QUIET = True
tpool.setup()
def tearDown(self): def tearDown(self):
tpool.QUIET = False tpool.QUIET = False
tpool.killall()
def test_a_buncha_stuff(self): def test_a_buncha_stuff(self):
pool = coros.CoroutinePool(max_size=10) pool = coros.CoroutinePool(max_size=10)
@@ -183,6 +185,9 @@ class TestTpool(TestCase):
self.assertRaises(api.TimeoutError, self.assertRaises(api.TimeoutError,
tpool.execute, time.sleep, 0.3) tpool.execute, time.sleep, 0.3)
def test_killall(self):
tpool.killall()
tpool.setup()
@skipped @skipped
def test_benchmark(self): def test_benchmark(self):

View File

@@ -22,6 +22,7 @@
import cgi import cgi
import os import os
from tests import skipped
from unittest import TestCase, main from unittest import TestCase, main
from eventlet import api from eventlet import api
@@ -129,9 +130,13 @@ class TestHttpd(TestCase):
def setUp(self): def setUp(self):
self.logfile = StringIO() self.logfile = StringIO()
self.site = Site() self.site = Site()
listener = api.tcp_listener(('0.0.0.0', 12346))
self.killer = api.spawn( self.killer = api.spawn(
wsgi.server, wsgi.server,
api.tcp_listener(('0.0.0.0', 12346)), self.site, max_size=128, log=self.logfile) listener,
self.site,
max_size=128,
log=self.logfile)
def tearDown(self): def tearDown(self):
api.kill(self.killer) api.kill(self.killer)
@@ -184,7 +189,8 @@ class TestHttpd(TestCase):
self.assertRaises(ConnectionClosed, read_http, sock) self.assertRaises(ConnectionClosed, read_http, sock)
fd.close() fd.close()
def skip_test_005_run_apachebench(self): @skipped
def test_005_run_apachebench(self):
url = 'http://localhost:12346/' url = 'http://localhost:12346/'
# ab is apachebench # ab is apachebench
out = processes.Process(find_command('ab'), out = processes.Process(find_command('ab'),
@@ -388,5 +394,6 @@ class TestHttpd(TestCase):
self.assertEquals(1, len([l for l in headerlines self.assertEquals(1, len([l for l in headerlines
if l.lower().startswith('content-length')])) if l.lower().startswith('content-length')]))
if __name__ == '__main__': if __name__ == '__main__':
main() main()