Refactored __init__ a little, skipped a processes-based tests on Windows.

This commit is contained in:
Ryan Williams
2009-12-07 14:39:26 -08:00
parent d3551e47bc
commit dd7257e237
3 changed files with 92 additions and 28 deletions

View File

@@ -4,6 +4,9 @@ import os
import errno
import unittest
# convenience
main = unittest.main
def skipped(func):
""" Decorator that marks a function as skipped. Uses nose's SkipTest exception
if installed. Without nose, this will count skipped tests as passing tests."""
@@ -21,24 +24,39 @@ def skipped(func):
return skipme
def skip_unless(requirement):
""" Decorator that skips a test if the *requirement* does not return True.
*requirement* can be a boolean or a callable that accepts one argument.
The callable will be called with the function to be decorated, and
should return True if the requirement is satisfied.
def skip_if(condition):
""" Decorator that skips a test if the *condition* evaluates True.
*condition* can be a boolean or a callable that accepts one argument.
The callable will be called with the function to be decorated, and
should return True to skip the test.
"""
if isinstance(requirement, bool):
def skipped_wrapper(func):
if not requirement:
return skipped(func)
else:
return func
else:
def skipped_wrapper(func):
if not requirement(func):
return skipped(func)
else:
return func
def skipped_wrapper(func):
if isinstance(condition, bool):
result = condition
else:
result = condition(func)
if result:
return skipped(func)
else:
return func
return skipped_wrapper
def skip_unless(condition):
""" Decorator that skips a test if the *condition* does not return True.
*condition* can be a boolean or a callable that accepts one argument.
The callable will be called with the function to be decorated, and
should return True if the condition is satisfied.
"""
def skipped_wrapper(func):
if isinstance(condition, bool):
result = condition
else:
result = condition(func)
if not result:
return skipped(func)
else:
return func
return skipped_wrapper
@@ -55,10 +73,15 @@ def requires_twisted(func):
def skip_with_libevent(func):
""" Decorator that skips a test if we're using the libevent hub."""
def requirement(_f):
def using_libevent(_f):
from eventlet.api import get_hub
return not('libevent' in type(get_hub()).__module__)
return skip_unless(requirement)(func)
return 'libevent' in type(get_hub()).__module__
return skip_if(using_libevent)(func)
def skip_on_windows(func):
import sys
return skip_if(sys.platform.startswith('win'))(func)
class TestIsTakingTooLong(Exception):

View File

@@ -1,12 +1,14 @@
import sys
from unittest import TestCase, main
from tests import LimitedTestCase, main, skip_on_windows
from eventlet import processes, api
class TestEchoPool(TestCase):
class TestEchoPool(LimitedTestCase):
def setUp(self):
super(TestEchoPool, self).setUp()
self.pool = processes.ProcessPool('echo', ["hello"])
@skip_on_windows
def test_echo(self):
result = None
@@ -17,6 +19,7 @@ class TestEchoPool(TestCase):
self.pool.put(proc)
self.assertEquals(result, 'hello\n')
@skip_on_windows
def test_read_eof(self):
proc = self.pool.get()
try:
@@ -24,18 +27,21 @@ class TestEchoPool(TestCase):
self.assertRaises(processes.DeadProcess, proc.read)
finally:
self.pool.put(proc)
@skip_on_windows
def test_empty_echo(self):
p = processes.Process('echo', ['-n'])
self.assertEquals('', p.read())
self.assertRaises(processes.DeadProcess, p.read)
class TestCatPool(TestCase):
class TestCatPool(LimitedTestCase):
def setUp(self):
super(TestCatPool, self).setUp()
api.sleep(0)
self.pool = processes.ProcessPool('cat')
@skip_on_windows
def test_cat(self):
result = None
@@ -49,6 +55,7 @@ class TestCatPool(TestCase):
self.assertEquals(result, 'goodbye')
@skip_on_windows
def test_write_to_dead(self):
result = None
@@ -61,6 +68,7 @@ class TestCatPool(TestCase):
finally:
self.pool.put(proc)
@skip_on_windows
def test_close(self):
result = None
@@ -73,10 +81,12 @@ class TestCatPool(TestCase):
self.pool.put(proc)
class TestDyingProcessesLeavePool(TestCase):
class TestDyingProcessesLeavePool(LimitedTestCase):
def setUp(self):
super(TestDyingProcessesLeavePool, self).setUp()
self.pool = processes.ProcessPool('echo', ['hello'], max_size=1)
@skip_on_windows
def test_dead_process_not_inserted_into_pool(self):
proc = self.pool.get()
try:

View File

@@ -5,7 +5,7 @@ import os
import sys
import tempfile
import time
import unittest
from tests import LimitedTestCase, main, skip_on_windows
import re
import StringIO
@@ -31,12 +31,13 @@ class CoroutineCallingClass(object):
return self._my_dict
class TestSaranwrap(unittest.TestCase):
class TestSaranwrap(LimitedTestCase):
def assert_server_exists(self, prox):
self.assert_(saranwrap.status(prox))
prox.foo = 0
self.assertEqual(0, prox.foo)
@skip_on_windows
def test_wrap_tuple(self):
my_tuple = (1, 2)
prox = saranwrap.wrap(my_tuple)
@@ -44,6 +45,7 @@ class TestSaranwrap(unittest.TestCase):
self.assertEqual(prox[1], 2)
self.assertEqual(len(my_tuple), 2)
@skip_on_windows
def test_wrap_string(self):
my_object = "whatever"
prox = saranwrap.wrap(my_object)
@@ -51,6 +53,7 @@ class TestSaranwrap(unittest.TestCase):
self.assertEqual(len(my_object), len(prox))
self.assertEqual(my_object.join(['a', 'b']), prox.join(['a', 'b']))
@skip_on_windows
def test_wrap_uniterable(self):
# here we're treating the exception as just a normal class
prox = saranwrap.wrap(FloatingPointError())
@@ -62,6 +65,7 @@ class TestSaranwrap(unittest.TestCase):
self.assertRaises(IndexError, index)
self.assertRaises(TypeError, key)
@skip_on_windows
def test_wrap_dict(self):
my_object = {'a':1}
prox = saranwrap.wrap(my_object)
@@ -71,6 +75,7 @@ class TestSaranwrap(unittest.TestCase):
self.assertEqual('saran:' + repr(my_object), repr(prox))
self.assertEqual('saran:' + `my_object`, `prox`)
@skip_on_windows
def test_wrap_module_class(self):
prox = saranwrap.wrap(re)
self.assertEqual(saranwrap.Proxy, type(prox))
@@ -78,6 +83,7 @@ class TestSaranwrap(unittest.TestCase):
self.assertEqual(exp.flags, 0)
self.assert_(repr(prox.compile))
@skip_on_windows
def test_wrap_eq(self):
prox = saranwrap.wrap(re)
exp1 = prox.compile('.')
@@ -86,6 +92,7 @@ class TestSaranwrap(unittest.TestCase):
exp3 = prox.compile('/')
self.assert_(exp1 != exp3)
@skip_on_windows
def test_wrap_nonzero(self):
prox = saranwrap.wrap(re)
exp1 = prox.compile('.')
@@ -93,6 +100,7 @@ class TestSaranwrap(unittest.TestCase):
prox2 = saranwrap.Proxy([1, 2, 3])
self.assert_(bool(prox2))
@skip_on_windows
def test_multiple_wraps(self):
prox1 = saranwrap.wrap(re)
prox2 = saranwrap.wrap(re)
@@ -101,6 +109,7 @@ class TestSaranwrap(unittest.TestCase):
del x2
x3 = prox2.compile('.')
@skip_on_windows
def test_dict_passthru(self):
prox = saranwrap.wrap(StringIO)
x = prox.StringIO('a')
@@ -108,25 +117,30 @@ class TestSaranwrap(unittest.TestCase):
# try it all on one line just for the sake of it
self.assertEqual(type(saranwrap.wrap(StringIO).StringIO('a').__dict__), saranwrap.ObjectProxy)
@skip_on_windows
def test_is_value(self):
server = saranwrap.Server(None, None, None)
self.assert_(server.is_value(None))
@skip_on_windows
def test_wrap_getitem(self):
prox = saranwrap.wrap([0,1,2])
self.assertEqual(prox[0], 0)
@skip_on_windows
def test_wrap_setitem(self):
prox = saranwrap.wrap([0,1,2])
prox[1] = 2
self.assertEqual(prox[1], 2)
@skip_on_windows
def test_raising_exceptions(self):
prox = saranwrap.wrap(re)
def nofunc():
prox.never_name_a_function_like_this()
self.assertRaises(AttributeError, nofunc)
@skip_on_windows
def test_unpicklable_server_exception(self):
prox = saranwrap.wrap(saranwrap)
def unpickle():
@@ -137,6 +151,7 @@ class TestSaranwrap(unittest.TestCase):
# It's basically dead
#self.assert_server_exists(prox)
@skip_on_windows
def test_pickleable_server_exception(self):
prox = saranwrap.wrap(saranwrap)
def fperror():
@@ -145,11 +160,13 @@ class TestSaranwrap(unittest.TestCase):
self.assertRaises(FloatingPointError, fperror)
self.assert_server_exists(prox)
@skip_on_windows
def test_print_does_not_break_wrapper(self):
prox = saranwrap.wrap(saranwrap)
prox.print_string('hello')
self.assert_server_exists(prox)
@skip_on_windows
def test_stderr_does_not_break_wrapper(self):
prox = saranwrap.wrap(saranwrap)
prox.err_string('goodbye')
@@ -158,6 +175,7 @@ class TestSaranwrap(unittest.TestCase):
def assertLessThan(self, a, b):
self.assert_(a < b, "%s is not less than %s" % (a, b))
@skip_on_windows
def test_status(self):
prox = saranwrap.wrap(time)
a = prox.gmtime(0)
@@ -176,6 +194,7 @@ class TestSaranwrap(unittest.TestCase):
prox2 = saranwrap.wrap(re)
self.assert_(status['pid'] != saranwrap.status(prox2)['pid'])
@skip_on_windows
def test_del(self):
prox = saranwrap.wrap(time)
delme = prox.gmtime(0)
@@ -189,11 +208,13 @@ class TestSaranwrap(unittest.TestCase):
#print status_after['objects']
self.assertLessThan(status_after['object_count'], status_before['object_count'])
@skip_on_windows
def test_contains(self):
prox = saranwrap.wrap({'a':'b'})
self.assert_('a' in prox)
self.assert_('x' not in prox)
@skip_on_windows
def test_variable_and_keyword_arguments_with_function_calls(self):
import optparse
prox = saranwrap.wrap(optparse)
@@ -202,6 +223,7 @@ class TestSaranwrap(unittest.TestCase):
opts,args = parser.parse_args(["-nfoo"])
self.assertEqual(opts.n, 'foo')
@skip_on_windows
def test_original_proxy_going_out_of_scope(self):
def make_re():
prox = saranwrap.wrap(re)
@@ -224,6 +246,7 @@ class TestSaranwrap(unittest.TestCase):
except AttributeError, e:
pass
@skip_on_windows
def test_not_inheriting_pythonpath(self):
# construct a fake module in the temp directory
temp_dir = tempfile.mkdtemp("saranwrap_test")
@@ -253,6 +276,7 @@ sys_path = sys.path""")
shutil.rmtree(temp_dir)
sys.path.remove(temp_dir)
@skip_on_windows
def test_contention(self):
from tests import saranwrap_test
prox = saranwrap.wrap(saranwrap_test)
@@ -265,6 +289,7 @@ sys_path = sys.path""")
for waiter in waiters:
waiter.wait()
@skip_on_windows
def test_copy(self):
import copy
compound_object = {'a':[1,2,3]}
@@ -278,12 +303,14 @@ sys_path = sys.path""")
make_assertions(copy.copy(prox))
make_assertions(copy.deepcopy(prox))
@skip_on_windows
def test_list_of_functions(self):
return # this test is known to fail, we can implement it sometime in the future if we wish
from tests import saranwrap_test
prox = saranwrap.wrap([saranwrap_test.list_maker])
self.assertEquals(list_maker(), prox[0]())
@skip_on_windows
def test_under_the_hood_coroutines(self):
# so, we want to write a class which uses a coroutine to call
# a function. Then we want to saranwrap that class, have
@@ -302,6 +329,7 @@ sys_path = sys.path""")
'random' in obj_proxy.get_dict(),
'Coroutine in saranwrapped object did not run')
@skip_on_windows
def test_child_process_death(self):
prox = saranwrap.wrap({})
pid = saranwrap.getpid(prox)
@@ -310,17 +338,20 @@ sys_path = sys.path""")
api.sleep(0.1) # need to let the signal handler run
self.assertRaises(OSError, os.kill, pid, 0) # raises OSError if pid doesn't exist
@skip_on_windows
def test_detection_of_server_crash(self):
# make the server crash here
pass
@skip_on_windows
def test_equality_with_local_object(self):
# we'll implement this if there's a use case for it
pass
@skip_on_windows
def test_non_blocking(self):
# here we test whether it's nonblocking
pass
if __name__ == '__main__':
unittest.main()
main()