From ceec9509950a0d20470a78c98e6d8402d1963ff5 Mon Sep 17 00:00:00 2001 From: Edward George Date: Tue, 19 Mar 2013 05:58:58 +0000 Subject: [PATCH] tpool: single request queue https://bitbucket.org/eventlet/eventlet/pull-request/32/use-single-request-queue-for-threadpool --- eventlet/tpool.py | 32 +++++++++++--------------------- 1 file changed, 11 insertions(+), 21 deletions(-) diff --git a/eventlet/tpool.py b/eventlet/tpool.py index e7d963d..ea6c3ca 100644 --- a/eventlet/tpool.py +++ b/eventlet/tpool.py @@ -39,6 +39,7 @@ def _signal_t2e(): _wfile.write(_bytetosend) _wfile.flush() +_reqq = None _rspq = None def tpool_trampoline(): @@ -61,11 +62,11 @@ def tpool_trampoline(): SYS_EXCS = (KeyboardInterrupt, SystemExit) EXC_CLASSES = (Exception, timeout.Timeout) -def tworker(reqq): +def tworker(): global _rspq while(True): try: - msg = reqq.get() + msg = _reqq.get() except AttributeError: return # can't get anything off of a dud queue if msg is None: @@ -104,17 +105,8 @@ def execute(meth,*args, **kwargs): if my_thread in _threads or imp.lock_held() or _nthreads == 0: return meth(*args, **kwargs) - cur = greenthread.getcurrent() - # a mini mixing function to make up for the fact that hash(greenlet) doesn't - # have much variability in the lower bits - k = hash(cur) - k = k + 0x2c865fd + (k >> 5) - k = k ^ 0xc84d1b7 ^ (k >> 7) - thread_index = k % _nthreads - - reqq, _thread = _threads[thread_index] e = event.Event() - reqq.put((e,meth,args,kwargs)) + _reqq.put((e,meth,args,kwargs)) rv = e.wait() if isinstance(rv,tuple) \ @@ -236,7 +228,7 @@ _threads = [] _coro = None _setup_already = False def setup(): - global _rfile, _wfile, _threads, _coro, _setup_already, _rspq + global _rfile, _wfile, _threads, _coro, _setup_already, _rspq, _reqq if _setup_already: return else: @@ -259,6 +251,7 @@ def setup(): _rfile = greenio.GreenSocket(csock).makefile('rb', 0) _wfile = nsock.makefile('wb',0) + _reqq = Queue(maxsize=-1) _rspq = Queue(maxsize=-1) assert _nthreads >= 0, "Can't specify negative number of threads" if _nthreads == 0: @@ -267,14 +260,11 @@ def setup(): execute in main thread. Check the value of the environment \ variable EVENTLET_THREADPOOL_SIZE.", RuntimeWarning) for i in xrange(_nthreads): - reqq = Queue(maxsize=-1) t = threading.Thread(target=tworker, - name="tpool_thread_%s" % i, - args=(reqq,)) + name="tpool_thread_%s" % i) t.setDaemon(True) t.start() - _threads.append((reqq, t)) - + _threads.append(t) _coro = greenthread.spawn_n(tpool_trampoline) @@ -283,9 +273,9 @@ def killall(): global _setup_already, _rspq, _rfile, _wfile if not _setup_already: return - for reqq, _ in _threads: - reqq.put(None) - for _, thr in _threads: + for thr in _threads: + _reqq.put(None) + for thr in _threads: thr.join() del _threads[:] if _coro is not None: