From dd7257e2378ce7f41899d273bfbbb7910ac80448 Mon Sep 17 00:00:00 2001 From: Ryan Williams Date: Mon, 7 Dec 2009 14:39:26 -0800 Subject: [PATCH] Refactored __init__ a little, skipped a processes-based tests on Windows. --- tests/__init__.py | 63 ++++++++++++++++++++++++++++------------- tests/processes_test.py | 20 +++++++++---- tests/saranwrap_test.py | 37 ++++++++++++++++++++++-- 3 files changed, 92 insertions(+), 28 deletions(-) diff --git a/tests/__init__.py b/tests/__init__.py index d0b6e04..a1df6a8 100644 --- a/tests/__init__.py +++ b/tests/__init__.py @@ -4,6 +4,9 @@ import os import errno import unittest +# convenience +main = unittest.main + def skipped(func): """ Decorator that marks a function as skipped. Uses nose's SkipTest exception if installed. Without nose, this will count skipped tests as passing tests.""" @@ -21,24 +24,39 @@ def skipped(func): return skipme -def skip_unless(requirement): - """ Decorator that skips a test if the *requirement* does not return True. - *requirement* can be a boolean or a callable that accepts one argument. - The callable will be called with the function to be decorated, and - should return True if the requirement is satisfied. +def skip_if(condition): + """ Decorator that skips a test if the *condition* evaluates True. + *condition* can be a boolean or a callable that accepts one argument. + The callable will be called with the function to be decorated, and + should return True to skip the test. """ - if isinstance(requirement, bool): - def skipped_wrapper(func): - if not requirement: - return skipped(func) - else: - return func - else: - def skipped_wrapper(func): - if not requirement(func): - return skipped(func) - else: - return func + def skipped_wrapper(func): + if isinstance(condition, bool): + result = condition + else: + result = condition(func) + if result: + return skipped(func) + else: + return func + return skipped_wrapper + + +def skip_unless(condition): + """ Decorator that skips a test if the *condition* does not return True. + *condition* can be a boolean or a callable that accepts one argument. + The callable will be called with the function to be decorated, and + should return True if the condition is satisfied. + """ + def skipped_wrapper(func): + if isinstance(condition, bool): + result = condition + else: + result = condition(func) + if not result: + return skipped(func) + else: + return func return skipped_wrapper @@ -55,10 +73,15 @@ def requires_twisted(func): def skip_with_libevent(func): """ Decorator that skips a test if we're using the libevent hub.""" - def requirement(_f): + def using_libevent(_f): from eventlet.api import get_hub - return not('libevent' in type(get_hub()).__module__) - return skip_unless(requirement)(func) + return 'libevent' in type(get_hub()).__module__ + return skip_if(using_libevent)(func) + + +def skip_on_windows(func): + import sys + return skip_if(sys.platform.startswith('win'))(func) class TestIsTakingTooLong(Exception): diff --git a/tests/processes_test.py b/tests/processes_test.py index 2bcd81e..17355ed 100644 --- a/tests/processes_test.py +++ b/tests/processes_test.py @@ -1,12 +1,14 @@ import sys -from unittest import TestCase, main +from tests import LimitedTestCase, main, skip_on_windows from eventlet import processes, api -class TestEchoPool(TestCase): +class TestEchoPool(LimitedTestCase): def setUp(self): + super(TestEchoPool, self).setUp() self.pool = processes.ProcessPool('echo', ["hello"]) + @skip_on_windows def test_echo(self): result = None @@ -17,6 +19,7 @@ class TestEchoPool(TestCase): self.pool.put(proc) self.assertEquals(result, 'hello\n') + @skip_on_windows def test_read_eof(self): proc = self.pool.get() try: @@ -24,18 +27,21 @@ class TestEchoPool(TestCase): self.assertRaises(processes.DeadProcess, proc.read) finally: self.pool.put(proc) - + + @skip_on_windows def test_empty_echo(self): p = processes.Process('echo', ['-n']) self.assertEquals('', p.read()) self.assertRaises(processes.DeadProcess, p.read) -class TestCatPool(TestCase): +class TestCatPool(LimitedTestCase): def setUp(self): + super(TestCatPool, self).setUp() api.sleep(0) self.pool = processes.ProcessPool('cat') + @skip_on_windows def test_cat(self): result = None @@ -49,6 +55,7 @@ class TestCatPool(TestCase): self.assertEquals(result, 'goodbye') + @skip_on_windows def test_write_to_dead(self): result = None @@ -61,6 +68,7 @@ class TestCatPool(TestCase): finally: self.pool.put(proc) + @skip_on_windows def test_close(self): result = None @@ -73,10 +81,12 @@ class TestCatPool(TestCase): self.pool.put(proc) -class TestDyingProcessesLeavePool(TestCase): +class TestDyingProcessesLeavePool(LimitedTestCase): def setUp(self): + super(TestDyingProcessesLeavePool, self).setUp() self.pool = processes.ProcessPool('echo', ['hello'], max_size=1) + @skip_on_windows def test_dead_process_not_inserted_into_pool(self): proc = self.pool.get() try: diff --git a/tests/saranwrap_test.py b/tests/saranwrap_test.py index 7d6c580..03fb1e2 100644 --- a/tests/saranwrap_test.py +++ b/tests/saranwrap_test.py @@ -5,7 +5,7 @@ import os import sys import tempfile import time -import unittest +from tests import LimitedTestCase, main, skip_on_windows import re import StringIO @@ -31,12 +31,13 @@ class CoroutineCallingClass(object): return self._my_dict -class TestSaranwrap(unittest.TestCase): +class TestSaranwrap(LimitedTestCase): def assert_server_exists(self, prox): self.assert_(saranwrap.status(prox)) prox.foo = 0 self.assertEqual(0, prox.foo) + @skip_on_windows def test_wrap_tuple(self): my_tuple = (1, 2) prox = saranwrap.wrap(my_tuple) @@ -44,6 +45,7 @@ class TestSaranwrap(unittest.TestCase): self.assertEqual(prox[1], 2) self.assertEqual(len(my_tuple), 2) + @skip_on_windows def test_wrap_string(self): my_object = "whatever" prox = saranwrap.wrap(my_object) @@ -51,6 +53,7 @@ class TestSaranwrap(unittest.TestCase): self.assertEqual(len(my_object), len(prox)) self.assertEqual(my_object.join(['a', 'b']), prox.join(['a', 'b'])) + @skip_on_windows def test_wrap_uniterable(self): # here we're treating the exception as just a normal class prox = saranwrap.wrap(FloatingPointError()) @@ -62,6 +65,7 @@ class TestSaranwrap(unittest.TestCase): self.assertRaises(IndexError, index) self.assertRaises(TypeError, key) + @skip_on_windows def test_wrap_dict(self): my_object = {'a':1} prox = saranwrap.wrap(my_object) @@ -71,6 +75,7 @@ class TestSaranwrap(unittest.TestCase): self.assertEqual('saran:' + repr(my_object), repr(prox)) self.assertEqual('saran:' + `my_object`, `prox`) + @skip_on_windows def test_wrap_module_class(self): prox = saranwrap.wrap(re) self.assertEqual(saranwrap.Proxy, type(prox)) @@ -78,6 +83,7 @@ class TestSaranwrap(unittest.TestCase): self.assertEqual(exp.flags, 0) self.assert_(repr(prox.compile)) + @skip_on_windows def test_wrap_eq(self): prox = saranwrap.wrap(re) exp1 = prox.compile('.') @@ -86,6 +92,7 @@ class TestSaranwrap(unittest.TestCase): exp3 = prox.compile('/') self.assert_(exp1 != exp3) + @skip_on_windows def test_wrap_nonzero(self): prox = saranwrap.wrap(re) exp1 = prox.compile('.') @@ -93,6 +100,7 @@ class TestSaranwrap(unittest.TestCase): prox2 = saranwrap.Proxy([1, 2, 3]) self.assert_(bool(prox2)) + @skip_on_windows def test_multiple_wraps(self): prox1 = saranwrap.wrap(re) prox2 = saranwrap.wrap(re) @@ -101,6 +109,7 @@ class TestSaranwrap(unittest.TestCase): del x2 x3 = prox2.compile('.') + @skip_on_windows def test_dict_passthru(self): prox = saranwrap.wrap(StringIO) x = prox.StringIO('a') @@ -108,25 +117,30 @@ class TestSaranwrap(unittest.TestCase): # try it all on one line just for the sake of it self.assertEqual(type(saranwrap.wrap(StringIO).StringIO('a').__dict__), saranwrap.ObjectProxy) + @skip_on_windows def test_is_value(self): server = saranwrap.Server(None, None, None) self.assert_(server.is_value(None)) + @skip_on_windows def test_wrap_getitem(self): prox = saranwrap.wrap([0,1,2]) self.assertEqual(prox[0], 0) + @skip_on_windows def test_wrap_setitem(self): prox = saranwrap.wrap([0,1,2]) prox[1] = 2 self.assertEqual(prox[1], 2) + @skip_on_windows def test_raising_exceptions(self): prox = saranwrap.wrap(re) def nofunc(): prox.never_name_a_function_like_this() self.assertRaises(AttributeError, nofunc) + @skip_on_windows def test_unpicklable_server_exception(self): prox = saranwrap.wrap(saranwrap) def unpickle(): @@ -137,6 +151,7 @@ class TestSaranwrap(unittest.TestCase): # It's basically dead #self.assert_server_exists(prox) + @skip_on_windows def test_pickleable_server_exception(self): prox = saranwrap.wrap(saranwrap) def fperror(): @@ -145,11 +160,13 @@ class TestSaranwrap(unittest.TestCase): self.assertRaises(FloatingPointError, fperror) self.assert_server_exists(prox) + @skip_on_windows def test_print_does_not_break_wrapper(self): prox = saranwrap.wrap(saranwrap) prox.print_string('hello') self.assert_server_exists(prox) + @skip_on_windows def test_stderr_does_not_break_wrapper(self): prox = saranwrap.wrap(saranwrap) prox.err_string('goodbye') @@ -158,6 +175,7 @@ class TestSaranwrap(unittest.TestCase): def assertLessThan(self, a, b): self.assert_(a < b, "%s is not less than %s" % (a, b)) + @skip_on_windows def test_status(self): prox = saranwrap.wrap(time) a = prox.gmtime(0) @@ -176,6 +194,7 @@ class TestSaranwrap(unittest.TestCase): prox2 = saranwrap.wrap(re) self.assert_(status['pid'] != saranwrap.status(prox2)['pid']) + @skip_on_windows def test_del(self): prox = saranwrap.wrap(time) delme = prox.gmtime(0) @@ -189,11 +208,13 @@ class TestSaranwrap(unittest.TestCase): #print status_after['objects'] self.assertLessThan(status_after['object_count'], status_before['object_count']) + @skip_on_windows def test_contains(self): prox = saranwrap.wrap({'a':'b'}) self.assert_('a' in prox) self.assert_('x' not in prox) + @skip_on_windows def test_variable_and_keyword_arguments_with_function_calls(self): import optparse prox = saranwrap.wrap(optparse) @@ -202,6 +223,7 @@ class TestSaranwrap(unittest.TestCase): opts,args = parser.parse_args(["-nfoo"]) self.assertEqual(opts.n, 'foo') + @skip_on_windows def test_original_proxy_going_out_of_scope(self): def make_re(): prox = saranwrap.wrap(re) @@ -224,6 +246,7 @@ class TestSaranwrap(unittest.TestCase): except AttributeError, e: pass + @skip_on_windows def test_not_inheriting_pythonpath(self): # construct a fake module in the temp directory temp_dir = tempfile.mkdtemp("saranwrap_test") @@ -253,6 +276,7 @@ sys_path = sys.path""") shutil.rmtree(temp_dir) sys.path.remove(temp_dir) + @skip_on_windows def test_contention(self): from tests import saranwrap_test prox = saranwrap.wrap(saranwrap_test) @@ -265,6 +289,7 @@ sys_path = sys.path""") for waiter in waiters: waiter.wait() + @skip_on_windows def test_copy(self): import copy compound_object = {'a':[1,2,3]} @@ -278,12 +303,14 @@ sys_path = sys.path""") make_assertions(copy.copy(prox)) make_assertions(copy.deepcopy(prox)) + @skip_on_windows def test_list_of_functions(self): return # this test is known to fail, we can implement it sometime in the future if we wish from tests import saranwrap_test prox = saranwrap.wrap([saranwrap_test.list_maker]) self.assertEquals(list_maker(), prox[0]()) + @skip_on_windows def test_under_the_hood_coroutines(self): # so, we want to write a class which uses a coroutine to call # a function. Then we want to saranwrap that class, have @@ -302,6 +329,7 @@ sys_path = sys.path""") 'random' in obj_proxy.get_dict(), 'Coroutine in saranwrapped object did not run') + @skip_on_windows def test_child_process_death(self): prox = saranwrap.wrap({}) pid = saranwrap.getpid(prox) @@ -310,17 +338,20 @@ sys_path = sys.path""") api.sleep(0.1) # need to let the signal handler run self.assertRaises(OSError, os.kill, pid, 0) # raises OSError if pid doesn't exist + @skip_on_windows def test_detection_of_server_crash(self): # make the server crash here pass + @skip_on_windows def test_equality_with_local_object(self): # we'll implement this if there's a use case for it pass + @skip_on_windows def test_non_blocking(self): # here we test whether it's nonblocking pass if __name__ == '__main__': - unittest.main() + main()