Added multiple-reader prevention code because it seems to be a fairly common pitfall. Also added defaults to the debug methods so you can call them no-args to reset them.
This commit is contained in:
@@ -93,7 +93,7 @@ def format_hub_timers():
|
|||||||
result.append(repr(l))
|
result.append(repr(l))
|
||||||
return os.linesep.join(result)
|
return os.linesep.join(result)
|
||||||
|
|
||||||
def hub_listener_stacks(state):
|
def hub_listener_stacks(state = False):
|
||||||
"""Toggles whether or not the hub records the stack when clients register
|
"""Toggles whether or not the hub records the stack when clients register
|
||||||
listeners on file descriptors. This can be useful when trying to figure
|
listeners on file descriptors. This can be useful when trying to figure
|
||||||
out what the hub is up to at any given moment. To inspect the stacks
|
out what the hub is up to at any given moment. To inspect the stacks
|
||||||
@@ -103,7 +103,7 @@ def hub_listener_stacks(state):
|
|||||||
from eventlet import hubs
|
from eventlet import hubs
|
||||||
hubs.get_hub().set_debug_listeners(state)
|
hubs.get_hub().set_debug_listeners(state)
|
||||||
|
|
||||||
def hub_timer_stacks(state):
|
def hub_timer_stacks(state = False):
|
||||||
"""Toggles whether or not the hub records the stack when timers are set.
|
"""Toggles whether or not the hub records the stack when timers are set.
|
||||||
To inspect the stacks of the current timers, call :func:`format_hub_timers`
|
To inspect the stacks of the current timers, call :func:`format_hub_timers`
|
||||||
at critical junctures in the application logic.
|
at critical junctures in the application logic.
|
||||||
@@ -111,7 +111,11 @@ def hub_timer_stacks(state):
|
|||||||
from eventlet.hubs import timer
|
from eventlet.hubs import timer
|
||||||
timer._g_debug = state
|
timer._g_debug = state
|
||||||
|
|
||||||
def hub_exceptions(state):
|
def hub_prevent_multiple_readers(state = True):
|
||||||
|
from eventlet.hubs import hub
|
||||||
|
hub.g_prevent_multiple_readers = state
|
||||||
|
|
||||||
|
def hub_exceptions(state = True):
|
||||||
"""Toggles whether the hub prints exceptions that are raised from its
|
"""Toggles whether the hub prints exceptions that are raised from its
|
||||||
timers. This can be useful to see how greenthreads are terminating.
|
timers. This can be useful to see how greenthreads are terminating.
|
||||||
"""
|
"""
|
||||||
@@ -120,7 +124,7 @@ def hub_exceptions(state):
|
|||||||
from eventlet import greenpool
|
from eventlet import greenpool
|
||||||
greenpool.DEBUG = state
|
greenpool.DEBUG = state
|
||||||
|
|
||||||
def tpool_exceptions(state):
|
def tpool_exceptions(state = False):
|
||||||
"""Toggles whether tpool itself prints exceptions that are raised from
|
"""Toggles whether tpool itself prints exceptions that are raised from
|
||||||
functions that are executed in it, in addition to raising them like
|
functions that are executed in it, in addition to raising them like
|
||||||
it normally does."""
|
it normally does."""
|
||||||
|
@@ -1,12 +1,15 @@
|
|||||||
import heapq
|
import heapq
|
||||||
import sys
|
import sys
|
||||||
import traceback
|
import traceback
|
||||||
|
import warnings
|
||||||
|
|
||||||
from eventlet.support import greenlets as greenlet, clear_sys_exc_info
|
from eventlet.support import greenlets as greenlet, clear_sys_exc_info
|
||||||
from eventlet.hubs import timer
|
from eventlet.hubs import timer
|
||||||
from eventlet import patcher
|
from eventlet import patcher
|
||||||
time = patcher.original('time')
|
time = patcher.original('time')
|
||||||
|
|
||||||
|
g_prevent_multiple_readers = True
|
||||||
|
|
||||||
READ="read"
|
READ="read"
|
||||||
WRITE="write"
|
WRITE="write"
|
||||||
|
|
||||||
@@ -74,6 +77,15 @@ class BaseHub(object):
|
|||||||
listener = self.lclass(evtype, fileno, cb)
|
listener = self.lclass(evtype, fileno, cb)
|
||||||
bucket = self.listeners[evtype]
|
bucket = self.listeners[evtype]
|
||||||
if fileno in bucket:
|
if fileno in bucket:
|
||||||
|
if g_prevent_multiple_readers:
|
||||||
|
raise RuntimeError("Second simultaneous %s on fileno %s "\
|
||||||
|
"detected. Unless you really know what you're doing, "\
|
||||||
|
"make sure that only one greenthread can %s any "\
|
||||||
|
"particular socket. Consider using a pools.Pool. "\
|
||||||
|
"If you do know what you're doing and want to disable "\
|
||||||
|
"this error, call "\
|
||||||
|
"eventlet.debug.hub_multiple_reader_prevention(False)" % (
|
||||||
|
evtype, fileno, evtype))
|
||||||
# store off the second listener in another structure
|
# store off the second listener in another structure
|
||||||
self.secondaries[evtype].setdefault(fileno, []).append(listener)
|
self.secondaries[evtype].setdefault(fileno, []).append(listener)
|
||||||
else:
|
else:
|
||||||
|
@@ -483,11 +483,34 @@ class TestGreenIo(LimitedTestCase):
|
|||||||
|
|
||||||
gt.wait()
|
gt.wait()
|
||||||
|
|
||||||
|
@skip_with_pyevent
|
||||||
|
def test_raised_multiple_readers(self):
|
||||||
|
debug.hub_prevent_multiple_readers(True)
|
||||||
|
|
||||||
|
def handle(sock, addr):
|
||||||
|
sock.recv(1)
|
||||||
|
sock.sendall("a")
|
||||||
|
raise eventlet.StopServe()
|
||||||
|
listener = eventlet.listen(('127.0.0.1', 0))
|
||||||
|
server = eventlet.spawn(eventlet.serve,
|
||||||
|
listener,
|
||||||
|
handle)
|
||||||
|
def reader(s):
|
||||||
|
s.recv(1)
|
||||||
|
|
||||||
|
s = eventlet.connect(('127.0.0.1', listener.getsockname()[1]))
|
||||||
|
a = eventlet.spawn(reader, s)
|
||||||
|
eventlet.sleep(0)
|
||||||
|
self.assertRaises(RuntimeError, s.recv, 1)
|
||||||
|
s.sendall('b')
|
||||||
|
a.wait()
|
||||||
|
|
||||||
|
|
||||||
class TestGreenIoLong(LimitedTestCase):
|
class TestGreenIoLong(LimitedTestCase):
|
||||||
TEST_TIMEOUT=10 # the test here might take a while depending on the OS
|
TEST_TIMEOUT=10 # the test here might take a while depending on the OS
|
||||||
@skip_with_pyevent
|
@skip_with_pyevent
|
||||||
def test_multiple_readers(self, clibufsize=False):
|
def test_multiple_readers(self, clibufsize=False):
|
||||||
|
debug.hub_prevent_multiple_readers(False)
|
||||||
recvsize = 2 * min_buf_size()
|
recvsize = 2 * min_buf_size()
|
||||||
sendsize = 10 * recvsize
|
sendsize = 10 * recvsize
|
||||||
# test that we can have multiple coroutines reading
|
# test that we can have multiple coroutines reading
|
||||||
@@ -534,6 +557,7 @@ class TestGreenIoLong(LimitedTestCase):
|
|||||||
listener.close()
|
listener.close()
|
||||||
self.assert_(len(results1) > 0)
|
self.assert_(len(results1) > 0)
|
||||||
self.assert_(len(results2) > 0)
|
self.assert_(len(results2) > 0)
|
||||||
|
debug.hub_prevent_multiple_readers()
|
||||||
|
|
||||||
@skipped # by rdw because it fails but it's not clear how to make it pass
|
@skipped # by rdw because it fails but it's not clear how to make it pass
|
||||||
@skip_with_pyevent
|
@skip_with_pyevent
|
||||||
|
@@ -3,6 +3,8 @@
|
|||||||
Many of these tests make connections to external servers, and all.py tries to skip these tests rather than failing them, so you can get some work done on a plane.
|
Many of these tests make connections to external servers, and all.py tries to skip these tests rather than failing them, so you can get some work done on a plane.
|
||||||
"""
|
"""
|
||||||
|
|
||||||
|
from eventlet import debug
|
||||||
|
debug.hub_prevent_multiple_readers(False)
|
||||||
|
|
||||||
def restart_hub():
|
def restart_hub():
|
||||||
from eventlet import hubs
|
from eventlet import hubs
|
||||||
|
Reference in New Issue
Block a user