147 lines
4.5 KiB
Python
147 lines
4.5 KiB
Python
# package is named tests, not test, so it won't be confused with test in stdlib
|
|
import sys
|
|
import os
|
|
import errno
|
|
import unittest
|
|
import warnings
|
|
|
|
from eventlet import debug, hubs
|
|
|
|
# convenience for importers
|
|
main = unittest.main
|
|
|
|
def skipped(func):
|
|
""" Decorator that marks a function as skipped. Uses nose's SkipTest exception
|
|
if installed. Without nose, this will count skipped tests as passing tests."""
|
|
try:
|
|
from nose.plugins.skip import SkipTest
|
|
def skipme(*a, **k):
|
|
raise SkipTest()
|
|
skipme.__name__ = func.__name__
|
|
return skipme
|
|
except ImportError:
|
|
# no nose, we'll just skip the test ourselves
|
|
def skipme(*a, **k):
|
|
print "Skipping", func.__name__
|
|
skipme.__name__ = func.__name__
|
|
return skipme
|
|
|
|
|
|
def skip_if(condition):
|
|
""" Decorator that skips a test if the *condition* evaluates True.
|
|
*condition* can be a boolean or a callable that accepts one argument.
|
|
The callable will be called with the function to be decorated, and
|
|
should return True to skip the test.
|
|
"""
|
|
def skipped_wrapper(func):
|
|
if isinstance(condition, bool):
|
|
result = condition
|
|
else:
|
|
result = condition(func)
|
|
if result:
|
|
return skipped(func)
|
|
else:
|
|
return func
|
|
return skipped_wrapper
|
|
|
|
|
|
def skip_unless(condition):
|
|
""" Decorator that skips a test if the *condition* does not return True.
|
|
*condition* can be a boolean or a callable that accepts one argument.
|
|
The callable will be called with the function to be decorated, and
|
|
should return True if the condition is satisfied.
|
|
"""
|
|
def skipped_wrapper(func):
|
|
if isinstance(condition, bool):
|
|
result = condition
|
|
else:
|
|
result = condition(func)
|
|
if not result:
|
|
return skipped(func)
|
|
else:
|
|
return func
|
|
return skipped_wrapper
|
|
|
|
|
|
def requires_twisted(func):
|
|
""" Decorator that skips a test if Twisted is not present."""
|
|
def requirement(_f):
|
|
from eventlet.hubs import get_hub
|
|
try:
|
|
return 'Twisted' in type(get_hub()).__name__
|
|
except Exception:
|
|
return False
|
|
return skip_unless(requirement)(func)
|
|
|
|
|
|
def using_pyevent(_f):
|
|
from eventlet.hubs import get_hub
|
|
return 'pyevent' in type(get_hub()).__module__
|
|
|
|
def skip_with_pyevent(func):
|
|
""" Decorator that skips a test if we're using the pyevent hub."""
|
|
return skip_if(using_pyevent)(func)
|
|
|
|
|
|
def skip_on_windows(func):
|
|
""" Decorator that skips a test on Windows."""
|
|
import sys
|
|
return skip_if(sys.platform.startswith('win'))(func)
|
|
|
|
|
|
class TestIsTakingTooLong(Exception):
|
|
""" Custom exception class to be raised when a test's runtime exceeds a limit. """
|
|
pass
|
|
|
|
|
|
class LimitedTestCase(unittest.TestCase):
|
|
""" Unittest subclass that adds a timeout to all tests. Subclasses must
|
|
be sure to call the LimitedTestCase setUp and tearDown methods. The default
|
|
timeout is 1 second, change it by setting self.TEST_TIMEOUT to the desired
|
|
quantity."""
|
|
|
|
TEST_TIMEOUT = 1
|
|
def setUp(self):
|
|
import eventlet
|
|
self.timer = eventlet.Timeout(self.TEST_TIMEOUT,
|
|
TestIsTakingTooLong(self.TEST_TIMEOUT))
|
|
|
|
def tearDown(self):
|
|
self.timer.cancel()
|
|
try:
|
|
hub = hubs.get_hub()
|
|
num_readers = len(hub.get_readers())
|
|
num_writers = len(hub.get_writers())
|
|
assert num_readers == num_writers == 0
|
|
except AssertionError, e:
|
|
print "ERROR: Hub not empty"
|
|
print debug.format_hub_timers()
|
|
print debug.format_hub_listeners()
|
|
|
|
|
|
def verify_hub_empty():
|
|
from eventlet import hubs
|
|
hub = hubs.get_hub()
|
|
num_readers = len(hub.get_readers())
|
|
num_writers = len(hub.get_writers())
|
|
num_timers = hub.get_timers_count()
|
|
assert num_readers == 0 and num_writers == 0, "Readers: %s Writers: %s" % (num_readers, num_writers)
|
|
|
|
|
|
def find_command(command):
|
|
for dir in os.getenv('PATH', '/usr/bin:/usr/sbin').split(os.pathsep):
|
|
p = os.path.join(dir, command)
|
|
if os.access(p, os.X_OK):
|
|
return p
|
|
raise IOError(errno.ENOENT, 'Command not found: %r' % command)
|
|
|
|
def silence_warnings(func):
|
|
def wrapper(*args, **kw):
|
|
warnings.simplefilter('ignore', DeprecationWarning)
|
|
try:
|
|
return func(*args, **kw)
|
|
finally:
|
|
warnings.simplefilter('default', DeprecationWarning)
|
|
wrapper.__name__ = func.__name__
|
|
return wrapper
|