403 lines
13 KiB
Python
403 lines
13 KiB
Python
from __future__ import with_statement
|
|
import sys
|
|
import time
|
|
|
|
import tests
|
|
from tests import skip_with_pyevent, skip_if_no_itimer, skip_unless
|
|
from tests.patcher_test import ProcessBase
|
|
import eventlet
|
|
from eventlet import hubs
|
|
from eventlet.support import greenlets, six
|
|
|
|
|
|
DELAY = 0.001
|
|
|
|
|
|
def noop():
|
|
pass
|
|
|
|
|
|
class TestTimerCleanup(tests.LimitedTestCase):
|
|
TEST_TIMEOUT = 2
|
|
|
|
@skip_with_pyevent
|
|
def test_cancel_immediate(self):
|
|
hub = hubs.get_hub()
|
|
stimers = hub.get_timers_count()
|
|
scanceled = hub.timers_canceled
|
|
for i in six.moves.range(2000):
|
|
t = hubs.get_hub().schedule_call_global(60, noop)
|
|
t.cancel()
|
|
self.assert_less_than_equal(hub.timers_canceled,
|
|
hub.get_timers_count() + 1)
|
|
# there should be fewer than 1000 new timers and canceled
|
|
self.assert_less_than_equal(hub.get_timers_count(), 1000 + stimers)
|
|
self.assert_less_than_equal(hub.timers_canceled, 1000)
|
|
|
|
@skip_with_pyevent
|
|
def test_cancel_accumulated(self):
|
|
hub = hubs.get_hub()
|
|
stimers = hub.get_timers_count()
|
|
scanceled = hub.timers_canceled
|
|
for i in six.moves.range(2000):
|
|
t = hubs.get_hub().schedule_call_global(60, noop)
|
|
eventlet.sleep()
|
|
self.assert_less_than_equal(hub.timers_canceled,
|
|
hub.get_timers_count() + 1)
|
|
t.cancel()
|
|
self.assert_less_than_equal(hub.timers_canceled,
|
|
hub.get_timers_count() + 1, hub.timers)
|
|
# there should be fewer than 1000 new timers and canceled
|
|
self.assert_less_than_equal(hub.get_timers_count(), 1000 + stimers)
|
|
self.assert_less_than_equal(hub.timers_canceled, 1000)
|
|
|
|
@skip_with_pyevent
|
|
def test_cancel_proportion(self):
|
|
# if fewer than half the pending timers are canceled, it should
|
|
# not clean them out
|
|
hub = hubs.get_hub()
|
|
uncanceled_timers = []
|
|
stimers = hub.get_timers_count()
|
|
scanceled = hub.timers_canceled
|
|
for i in six.moves.range(1000):
|
|
# 2/3rds of new timers are uncanceled
|
|
t = hubs.get_hub().schedule_call_global(60, noop)
|
|
t2 = hubs.get_hub().schedule_call_global(60, noop)
|
|
t3 = hubs.get_hub().schedule_call_global(60, noop)
|
|
eventlet.sleep()
|
|
self.assert_less_than_equal(hub.timers_canceled,
|
|
hub.get_timers_count() + 1)
|
|
t.cancel()
|
|
self.assert_less_than_equal(hub.timers_canceled,
|
|
hub.get_timers_count() + 1)
|
|
uncanceled_timers.append(t2)
|
|
uncanceled_timers.append(t3)
|
|
# 3000 new timers, plus a few extras
|
|
self.assert_less_than_equal(stimers + 3000,
|
|
stimers + hub.get_timers_count())
|
|
self.assertEqual(hub.timers_canceled, 1000)
|
|
for t in uncanceled_timers:
|
|
t.cancel()
|
|
self.assert_less_than_equal(hub.timers_canceled,
|
|
hub.get_timers_count())
|
|
eventlet.sleep()
|
|
|
|
|
|
class TestScheduleCall(tests.LimitedTestCase):
|
|
|
|
def test_local(self):
|
|
lst = [1]
|
|
eventlet.spawn(hubs.get_hub().schedule_call_local, DELAY, lst.pop)
|
|
eventlet.sleep(0)
|
|
eventlet.sleep(DELAY * 2)
|
|
assert lst == [1], lst
|
|
|
|
def test_global(self):
|
|
lst = [1]
|
|
eventlet.spawn(hubs.get_hub().schedule_call_global, DELAY, lst.pop)
|
|
eventlet.sleep(0)
|
|
eventlet.sleep(DELAY * 2)
|
|
assert lst == [], lst
|
|
|
|
def test_ordering(self):
|
|
lst = []
|
|
hubs.get_hub().schedule_call_global(DELAY * 2, lst.append, 3)
|
|
hubs.get_hub().schedule_call_global(DELAY, lst.append, 1)
|
|
hubs.get_hub().schedule_call_global(DELAY, lst.append, 2)
|
|
while len(lst) < 3:
|
|
eventlet.sleep(DELAY)
|
|
self.assertEqual(lst, [1, 2, 3])
|
|
|
|
|
|
class TestDebug(tests.LimitedTestCase):
|
|
|
|
def test_debug_listeners(self):
|
|
hubs.get_hub().set_debug_listeners(True)
|
|
hubs.get_hub().set_debug_listeners(False)
|
|
|
|
def test_timer_exceptions(self):
|
|
hubs.get_hub().set_timer_exceptions(True)
|
|
hubs.get_hub().set_timer_exceptions(False)
|
|
|
|
|
|
class TestExceptionInMainloop(tests.LimitedTestCase):
|
|
|
|
def test_sleep(self):
|
|
# even if there was an error in the mainloop, the hub should continue
|
|
# to work
|
|
start = time.time()
|
|
eventlet.sleep(DELAY)
|
|
delay = time.time() - start
|
|
|
|
assert delay >= DELAY * \
|
|
0.9, 'sleep returned after %s seconds (was scheduled for %s)' % (
|
|
delay, DELAY)
|
|
|
|
def fail():
|
|
1 // 0
|
|
|
|
hubs.get_hub().schedule_call_global(0, fail)
|
|
|
|
start = time.time()
|
|
eventlet.sleep(DELAY)
|
|
delay = time.time() - start
|
|
|
|
assert delay >= DELAY * \
|
|
0.9, 'sleep returned after %s seconds (was scheduled for %s)' % (
|
|
delay, DELAY)
|
|
|
|
|
|
class TestExceptionInGreenthread(tests.LimitedTestCase):
|
|
|
|
@skip_unless(greenlets.preserves_excinfo)
|
|
def test_exceptionpreservation(self):
|
|
# events for controlling execution order
|
|
gt1event = eventlet.Event()
|
|
gt2event = eventlet.Event()
|
|
|
|
def test_gt1():
|
|
try:
|
|
raise KeyError()
|
|
except KeyError:
|
|
gt1event.send('exception')
|
|
gt2event.wait()
|
|
assert sys.exc_info()[0] is KeyError
|
|
gt1event.send('test passed')
|
|
|
|
def test_gt2():
|
|
gt1event.wait()
|
|
gt1event.reset()
|
|
assert sys.exc_info()[0] is None
|
|
try:
|
|
raise ValueError()
|
|
except ValueError:
|
|
gt2event.send('exception')
|
|
gt1event.wait()
|
|
assert sys.exc_info()[0] is ValueError
|
|
|
|
g1 = eventlet.spawn(test_gt1)
|
|
g2 = eventlet.spawn(test_gt2)
|
|
try:
|
|
g1.wait()
|
|
g2.wait()
|
|
finally:
|
|
g1.kill()
|
|
g2.kill()
|
|
|
|
def test_exceptionleaks(self):
|
|
# tests expected behaviour with all versions of greenlet
|
|
def test_gt(sem):
|
|
try:
|
|
raise KeyError()
|
|
except KeyError:
|
|
sem.release()
|
|
hubs.get_hub().switch()
|
|
|
|
# semaphores for controlling execution order
|
|
sem = eventlet.Semaphore()
|
|
sem.acquire()
|
|
g = eventlet.spawn(test_gt, sem)
|
|
try:
|
|
sem.acquire()
|
|
assert sys.exc_info()[0] is None
|
|
finally:
|
|
g.kill()
|
|
|
|
|
|
class TestHubSelection(tests.LimitedTestCase):
|
|
|
|
def test_explicit_hub(self):
|
|
oldhub = hubs.get_hub()
|
|
try:
|
|
hubs.use_hub(Foo)
|
|
assert isinstance(hubs.get_hub(), Foo), hubs.get_hub()
|
|
finally:
|
|
hubs._threadlocal.hub = oldhub
|
|
|
|
|
|
class TestHubBlockingDetector(tests.LimitedTestCase):
|
|
TEST_TIMEOUT = 10
|
|
|
|
@skip_with_pyevent
|
|
def test_block_detect(self):
|
|
def look_im_blocking():
|
|
import time
|
|
time.sleep(2)
|
|
from eventlet import debug
|
|
debug.hub_blocking_detection(True)
|
|
gt = eventlet.spawn(look_im_blocking)
|
|
self.assertRaises(RuntimeError, gt.wait)
|
|
debug.hub_blocking_detection(False)
|
|
|
|
@skip_with_pyevent
|
|
@skip_if_no_itimer
|
|
def test_block_detect_with_itimer(self):
|
|
def look_im_blocking():
|
|
import time
|
|
time.sleep(0.5)
|
|
|
|
from eventlet import debug
|
|
debug.hub_blocking_detection(True, resolution=0.1)
|
|
gt = eventlet.spawn(look_im_blocking)
|
|
self.assertRaises(RuntimeError, gt.wait)
|
|
debug.hub_blocking_detection(False)
|
|
|
|
|
|
class TestSuspend(tests.LimitedTestCase):
|
|
TEST_TIMEOUT = 4
|
|
longMessage = True
|
|
maxDiff = None
|
|
|
|
def test_suspend_doesnt_crash(self):
|
|
import os
|
|
import shutil
|
|
import signal
|
|
import subprocess
|
|
import sys
|
|
import tempfile
|
|
self.tempdir = tempfile.mkdtemp('test_suspend')
|
|
filename = os.path.join(self.tempdir, 'test_suspend.py')
|
|
fd = open(filename, "w")
|
|
fd.write("""import eventlet
|
|
eventlet.Timeout(0.5)
|
|
try:
|
|
eventlet.listen(("127.0.0.1", 0)).accept()
|
|
except eventlet.Timeout:
|
|
print("exited correctly")
|
|
""")
|
|
fd.close()
|
|
python_path = os.pathsep.join(sys.path + [self.tempdir])
|
|
new_env = os.environ.copy()
|
|
new_env['PYTHONPATH'] = python_path
|
|
p = subprocess.Popen([sys.executable,
|
|
os.path.join(self.tempdir, filename)],
|
|
stdout=subprocess.PIPE, stderr=subprocess.STDOUT, env=new_env)
|
|
eventlet.sleep(0.4) # wait for process to hit accept
|
|
os.kill(p.pid, signal.SIGSTOP) # suspend and resume to generate EINTR
|
|
os.kill(p.pid, signal.SIGCONT)
|
|
output, _ = p.communicate()
|
|
lines = output.decode('utf-8', 'replace').splitlines()
|
|
assert "exited correctly" in lines[-1], output
|
|
shutil.rmtree(self.tempdir)
|
|
|
|
|
|
def test_repeated_select_bad_fd():
|
|
from eventlet.green import select
|
|
|
|
def once():
|
|
try:
|
|
select.select([-1], [], [])
|
|
assert False, 'Expected ValueError'
|
|
except ValueError:
|
|
pass
|
|
|
|
once()
|
|
once()
|
|
|
|
|
|
@skip_with_pyevent
|
|
def test_fork():
|
|
tests.run_isolated('hub_fork.py')
|
|
|
|
|
|
def test_fork_simple():
|
|
tests.run_isolated('hub_fork_simple.py')
|
|
|
|
|
|
class TestDeadRunLoop(tests.LimitedTestCase):
|
|
TEST_TIMEOUT = 2
|
|
|
|
class CustomException(Exception):
|
|
pass
|
|
|
|
def test_kill(self):
|
|
""" Checks that killing a process after the hub runloop dies does
|
|
not immediately return to hub greenlet's parent and schedule a
|
|
redundant timer. """
|
|
hub = hubs.get_hub()
|
|
|
|
def dummyproc():
|
|
hub.switch()
|
|
|
|
g = eventlet.spawn(dummyproc)
|
|
eventlet.sleep(0) # let dummyproc run
|
|
assert hub.greenlet.parent == eventlet.greenthread.getcurrent()
|
|
self.assertRaises(KeyboardInterrupt, hub.greenlet.throw,
|
|
KeyboardInterrupt())
|
|
|
|
# kill dummyproc, this schedules a timer to return execution to
|
|
# this greenlet before throwing an exception in dummyproc.
|
|
# it is from this timer that execution should be returned to this
|
|
# greenlet, and not by propogating of the terminating greenlet.
|
|
g.kill()
|
|
with eventlet.Timeout(0.5, self.CustomException()):
|
|
# we now switch to the hub, there should be no existing timers
|
|
# that switch back to this greenlet and so this hub.switch()
|
|
# call should block indefinitely.
|
|
self.assertRaises(self.CustomException, hub.switch)
|
|
|
|
def test_parent(self):
|
|
""" Checks that a terminating greenthread whose parent
|
|
was a previous, now-defunct hub greenlet returns execution to
|
|
the hub runloop and not the hub greenlet's parent. """
|
|
hub = hubs.get_hub()
|
|
|
|
def dummyproc():
|
|
pass
|
|
|
|
g = eventlet.spawn(dummyproc)
|
|
assert hub.greenlet.parent == eventlet.greenthread.getcurrent()
|
|
self.assertRaises(KeyboardInterrupt, hub.greenlet.throw,
|
|
KeyboardInterrupt())
|
|
|
|
assert not g.dead # check dummyproc hasn't completed
|
|
with eventlet.Timeout(0.5, self.CustomException()):
|
|
# we now switch to the hub which will allow
|
|
# completion of dummyproc.
|
|
# this should return execution back to the runloop and not
|
|
# this greenlet so that hub.switch() would block indefinitely.
|
|
self.assertRaises(self.CustomException, hub.switch)
|
|
assert g.dead # sanity check that dummyproc has completed
|
|
|
|
|
|
class Foo(object):
|
|
pass
|
|
|
|
|
|
class TestDefaultHub(ProcessBase):
|
|
|
|
def test_kqueue_unsupported(self):
|
|
# https://github.com/eventlet/eventlet/issues/38
|
|
# get_hub on windows broken by kqueue
|
|
module_source = r'''
|
|
from __future__ import print_function
|
|
|
|
# Simulate absence of kqueue even on platforms that support it.
|
|
import select
|
|
try:
|
|
del select.kqueue
|
|
except AttributeError:
|
|
pass
|
|
|
|
from eventlet.support.six.moves import builtins
|
|
|
|
original_import = builtins.__import__
|
|
|
|
def fail_import(name, *args, **kwargs):
|
|
if 'epoll' in name:
|
|
raise ImportError('disabled for test')
|
|
if 'kqueue' in name:
|
|
print('kqueue tried')
|
|
return original_import(name, *args, **kwargs)
|
|
|
|
builtins.__import__ = fail_import
|
|
|
|
|
|
import eventlet.hubs
|
|
eventlet.hubs.get_default_hub()
|
|
print('ok')
|
|
'''
|
|
self.write_to_tempfile('newmod', module_source)
|
|
output, _ = self.launch_subprocess('newmod.py')
|
|
self.assertEqual(output, 'kqueue tried\nok\n')
|