Edward George
2013-03-19 05:58:58 +00:00
committed by Sergey Shepelev
parent 0195326cbe
commit ceec950995

View File

@@ -39,6 +39,7 @@ def _signal_t2e():
_wfile.write(_bytetosend) _wfile.write(_bytetosend)
_wfile.flush() _wfile.flush()
_reqq = None
_rspq = None _rspq = None
def tpool_trampoline(): def tpool_trampoline():
@@ -61,11 +62,11 @@ def tpool_trampoline():
SYS_EXCS = (KeyboardInterrupt, SystemExit) SYS_EXCS = (KeyboardInterrupt, SystemExit)
EXC_CLASSES = (Exception, timeout.Timeout) EXC_CLASSES = (Exception, timeout.Timeout)
def tworker(reqq): def tworker():
global _rspq global _rspq
while(True): while(True):
try: try:
msg = reqq.get() msg = _reqq.get()
except AttributeError: except AttributeError:
return # can't get anything off of a dud queue return # can't get anything off of a dud queue
if msg is None: if msg is None:
@@ -104,17 +105,8 @@ def execute(meth,*args, **kwargs):
if my_thread in _threads or imp.lock_held() or _nthreads == 0: if my_thread in _threads or imp.lock_held() or _nthreads == 0:
return meth(*args, **kwargs) return meth(*args, **kwargs)
cur = greenthread.getcurrent()
# a mini mixing function to make up for the fact that hash(greenlet) doesn't
# have much variability in the lower bits
k = hash(cur)
k = k + 0x2c865fd + (k >> 5)
k = k ^ 0xc84d1b7 ^ (k >> 7)
thread_index = k % _nthreads
reqq, _thread = _threads[thread_index]
e = event.Event() e = event.Event()
reqq.put((e,meth,args,kwargs)) _reqq.put((e,meth,args,kwargs))
rv = e.wait() rv = e.wait()
if isinstance(rv,tuple) \ if isinstance(rv,tuple) \
@@ -236,7 +228,7 @@ _threads = []
_coro = None _coro = None
_setup_already = False _setup_already = False
def setup(): def setup():
global _rfile, _wfile, _threads, _coro, _setup_already, _rspq global _rfile, _wfile, _threads, _coro, _setup_already, _rspq, _reqq
if _setup_already: if _setup_already:
return return
else: else:
@@ -259,6 +251,7 @@ def setup():
_rfile = greenio.GreenSocket(csock).makefile('rb', 0) _rfile = greenio.GreenSocket(csock).makefile('rb', 0)
_wfile = nsock.makefile('wb',0) _wfile = nsock.makefile('wb',0)
_reqq = Queue(maxsize=-1)
_rspq = Queue(maxsize=-1) _rspq = Queue(maxsize=-1)
assert _nthreads >= 0, "Can't specify negative number of threads" assert _nthreads >= 0, "Can't specify negative number of threads"
if _nthreads == 0: if _nthreads == 0:
@@ -267,14 +260,11 @@ def setup():
execute in main thread. Check the value of the environment \ execute in main thread. Check the value of the environment \
variable EVENTLET_THREADPOOL_SIZE.", RuntimeWarning) variable EVENTLET_THREADPOOL_SIZE.", RuntimeWarning)
for i in xrange(_nthreads): for i in xrange(_nthreads):
reqq = Queue(maxsize=-1)
t = threading.Thread(target=tworker, t = threading.Thread(target=tworker,
name="tpool_thread_%s" % i, name="tpool_thread_%s" % i)
args=(reqq,))
t.setDaemon(True) t.setDaemon(True)
t.start() t.start()
_threads.append((reqq, t)) _threads.append(t)
_coro = greenthread.spawn_n(tpool_trampoline) _coro = greenthread.spawn_n(tpool_trampoline)
@@ -283,9 +273,9 @@ def killall():
global _setup_already, _rspq, _rfile, _wfile global _setup_already, _rspq, _rfile, _wfile
if not _setup_already: if not _setup_already:
return return
for reqq, _ in _threads: for thr in _threads:
reqq.put(None) _reqq.put(None)
for _, thr in _threads: for thr in _threads:
thr.join() thr.join()
del _threads[:] del _threads[:]
if _coro is not None: if _coro is not None: