tpool: yield after setup() and before killall()

Fixes surprising errors in artificial IO-dry scenarios (like tests).
Won't hurt in real environment.
This commit is contained in:
Sergey Shepelev
2016-02-12 15:36:10 +05:00
parent 7f08dfd250
commit 7e6561e33f

View File

@@ -13,11 +13,13 @@
# See the License for the specific language governing permissions and # See the License for the specific language governing permissions and
# limitations under the License. # limitations under the License.
import atexit
import imp import imp
import os import os
import sys import sys
import traceback import traceback
import eventlet
from eventlet import event, greenio, greenthread, patcher, timeout from eventlet import event, greenio, greenthread, patcher, timeout
from eventlet.support import six from eventlet.support import six
@@ -39,7 +41,7 @@ if six.PY3:
Empty = Queue_module.Empty Empty = Queue_module.Empty
Queue = Queue_module.Queue Queue = Queue_module.Queue
_bytetosend = ' '.encode() _bytetosend = b' '
_coro = None _coro = None
_nthreads = int(os.environ.get('EVENTLET_THREADPOOL_SIZE', 20)) _nthreads = int(os.environ.get('EVENTLET_THREADPOOL_SIZE', 20))
_reqq = _rspq = None _reqq = _rspq = None
@@ -54,6 +56,7 @@ def tpool_trampoline():
try: try:
_c = _rsock.recv(1) _c = _rsock.recv(1)
assert _c assert _c
# FIXME: this is probably redundant since using sockets instead of pipe now
except ValueError: except ValueError:
break # will be raised when pipe is closed break # will be raised when pipe is closed
while not _rspq.empty(): while not _rspq.empty():
@@ -250,7 +253,7 @@ class Proxy(object):
def setup(): def setup():
global _rsock, _wsock, _threads, _coro, _setup_already, _rspq, _reqq global _rsock, _wsock, _coro, _setup_already, _rspq, _reqq
if _setup_already: if _setup_already:
return return
else: else:
@@ -271,7 +274,9 @@ def setup():
sock.listen(1) sock.listen(1)
csock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) csock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
csock.connect(sock.getsockname()) csock.connect(sock.getsockname())
csock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, True)
_wsock, _addr = sock.accept() _wsock, _addr = sock.accept()
_wsock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, True)
sock.close() sock.close()
_rsock = greenio.GreenSocket(csock) _rsock = greenio.GreenSocket(csock)
@@ -283,12 +288,20 @@ def setup():
_threads.append(t) _threads.append(t)
_coro = greenthread.spawn_n(tpool_trampoline) _coro = greenthread.spawn_n(tpool_trampoline)
# This yield fixes subtle error with GreenSocket.__del__
eventlet.sleep(0)
# Avoid ResourceWarning unclosed socket on Python3.2+
@atexit.register
def killall(): def killall():
global _setup_already, _rspq, _rsock, _wsock global _setup_already, _rspq, _rsock, _wsock
if not _setup_already: if not _setup_already:
return return
# This yield fixes freeze in some scenarios
eventlet.sleep(0)
for thr in _threads: for thr in _threads:
_reqq.put(None) _reqq.put(None)
for thr in _threads: for thr in _threads:
@@ -296,7 +309,7 @@ def killall():
del _threads[:] del _threads[:]
# return any remaining results # return any remaining results
while not _rspq.empty(): while (_rspq is not None) and not _rspq.empty():
try: try:
(e, rv) = _rspq.get(block=False) (e, rv) = _rspq.get(block=False)
e.send(rv) e.send(rv)
@@ -306,10 +319,12 @@ def killall():
if _coro is not None: if _coro is not None:
greenthread.kill(_coro) greenthread.kill(_coro)
_rsock.close() if _rsock is not None:
_wsock.close() _rsock.close()
_rsock = None _rsock = None
_wsock = None if _wsock is not None:
_wsock.close()
_wsock = None
_rspq = None _rspq = None
_setup_already = False _setup_already = False