Files
deb-python-eventlet/tests/saranwrap_test.py

389 lines
12 KiB
Python

from eventlet import api, saranwrap
from eventlet import greenpool
import os
import eventlet
import sys
import tempfile
import time
from tests import LimitedTestCase, main, skip_on_windows, skip_with_pyevent
import re
import StringIO
# random test stuff
def list_maker():
return [0,1,2]
one = 1
two = 2
three = 3
class CoroutineCallingClass(object):
def __init__(self):
self._my_dict = {}
def run_coroutine(self):
eventlet.spawn_n(self._add_random_key)
def _add_random_key(self):
self._my_dict['random'] = 'yes, random'
def get_dict(self):
return self._my_dict
class TestSaranwrap(LimitedTestCase):
TEST_TIMEOUT=8
def assert_server_exists(self, prox):
self.assert_(saranwrap.status(prox))
prox.foo = 0
self.assertEqual(0, prox.foo)
@skip_on_windows
@skip_with_pyevent
def test_wrap_tuple(self):
my_tuple = (1, 2)
prox = saranwrap.wrap(my_tuple)
self.assertEqual(prox[0], 1)
self.assertEqual(prox[1], 2)
self.assertEqual(len(my_tuple), 2)
@skip_on_windows
@skip_with_pyevent
def test_wrap_string(self):
my_object = "whatever"
prox = saranwrap.wrap(my_object)
self.assertEqual(str(my_object), str(prox))
self.assertEqual(len(my_object), len(prox))
self.assertEqual(my_object.join(['a', 'b']), prox.join(['a', 'b']))
@skip_on_windows
@skip_with_pyevent
def test_wrap_uniterable(self):
# here we're treating the exception as just a normal class
prox = saranwrap.wrap(FloatingPointError())
def index():
prox[0]
def key():
prox['a']
self.assertRaises(IndexError, index)
self.assertRaises(TypeError, key)
@skip_on_windows
@skip_with_pyevent
def test_wrap_dict(self):
my_object = {'a':1}
prox = saranwrap.wrap(my_object)
self.assertEqual('a', prox.keys()[0])
self.assertEqual(1, prox['a'])
self.assertEqual(str(my_object), str(prox))
self.assertEqual('saran:' + repr(my_object), repr(prox))
self.assertEqual('saran:' + `my_object`, `prox`)
@skip_on_windows
@skip_with_pyevent
def test_wrap_module_class(self):
prox = saranwrap.wrap(re)
self.assertEqual(saranwrap.Proxy, type(prox))
exp = prox.compile('.')
self.assertEqual(exp.flags, 0)
self.assert_(repr(prox.compile))
@skip_on_windows
@skip_with_pyevent
def test_wrap_eq(self):
prox = saranwrap.wrap(re)
exp1 = prox.compile('.')
exp2 = prox.compile(exp1.pattern)
self.assertEqual(exp1, exp2)
exp3 = prox.compile('/')
self.assert_(exp1 != exp3)
@skip_on_windows
@skip_with_pyevent
def test_wrap_nonzero(self):
prox = saranwrap.wrap(re)
exp1 = prox.compile('.')
self.assert_(bool(exp1))
prox2 = saranwrap.Proxy([1, 2, 3])
self.assert_(bool(prox2))
@skip_on_windows
@skip_with_pyevent
def test_multiple_wraps(self):
prox1 = saranwrap.wrap(re)
prox2 = saranwrap.wrap(re)
x1 = prox1.compile('.')
x2 = prox1.compile('.')
del x2
x3 = prox2.compile('.')
@skip_on_windows
@skip_with_pyevent
def test_dict_passthru(self):
prox = saranwrap.wrap(StringIO)
x = prox.StringIO('a')
self.assertEqual(type(x.__dict__), saranwrap.ObjectProxy)
# try it all on one line just for the sake of it
self.assertEqual(type(saranwrap.wrap(StringIO).StringIO('a').__dict__), saranwrap.ObjectProxy)
@skip_on_windows
@skip_with_pyevent
def test_is_value(self):
server = saranwrap.Server(None, None, None)
self.assert_(server.is_value(None))
@skip_on_windows
@skip_with_pyevent
def test_wrap_getitem(self):
prox = saranwrap.wrap([0,1,2])
self.assertEqual(prox[0], 0)
@skip_on_windows
@skip_with_pyevent
def test_wrap_setitem(self):
prox = saranwrap.wrap([0,1,2])
prox[1] = 2
self.assertEqual(prox[1], 2)
@skip_on_windows
@skip_with_pyevent
def test_raising_exceptions(self):
prox = saranwrap.wrap(re)
def nofunc():
prox.never_name_a_function_like_this()
self.assertRaises(AttributeError, nofunc)
@skip_on_windows
@skip_with_pyevent
def test_unpicklable_server_exception(self):
prox = saranwrap.wrap(saranwrap)
def unpickle():
prox.raise_an_unpicklable_error()
self.assertRaises(saranwrap.UnrecoverableError, unpickle)
# It's basically dead
#self.assert_server_exists(prox)
@skip_on_windows
@skip_with_pyevent
def test_pickleable_server_exception(self):
prox = saranwrap.wrap(saranwrap)
def fperror():
prox.raise_standard_error()
self.assertRaises(FloatingPointError, fperror)
self.assert_server_exists(prox)
@skip_on_windows
@skip_with_pyevent
def test_print_does_not_break_wrapper(self):
prox = saranwrap.wrap(saranwrap)
prox.print_string('hello')
self.assert_server_exists(prox)
@skip_on_windows
@skip_with_pyevent
def test_stderr_does_not_break_wrapper(self):
prox = saranwrap.wrap(saranwrap)
prox.err_string('goodbye')
self.assert_server_exists(prox)
def assertLessThan(self, a, b):
self.assert_(a < b, "%s is not less than %s" % (a, b))
@skip_on_windows
@skip_with_pyevent
def test_status(self):
prox = saranwrap.wrap(time)
a = prox.gmtime(0)
status = saranwrap.status(prox)
self.assertEqual(status['object_count'], 1)
self.assertEqual(status['next_id'], 2)
self.assert_(status['pid']) # can't guess what it will be
# status of an object should be the same as the module
self.assertEqual(saranwrap.status(a), status)
# create a new one then immediately delete it
prox.gmtime(1)
is_id = prox.ctime(1) # sync up deletes
status = saranwrap.status(prox)
self.assertEqual(status['object_count'], 1)
self.assertEqual(status['next_id'], 3)
prox2 = saranwrap.wrap(re)
self.assert_(status['pid'] != saranwrap.status(prox2)['pid'])
@skip_on_windows
@skip_with_pyevent
def test_del(self):
prox = saranwrap.wrap(time)
delme = prox.gmtime(0)
status_before = saranwrap.status(prox)
#print status_before['objects']
del delme
# need to do an access that doesn't create an object
# in order to sync up the deleted objects
prox.ctime(1)
status_after = saranwrap.status(prox)
#print status_after['objects']
self.assertLessThan(status_after['object_count'], status_before['object_count'])
@skip_on_windows
@skip_with_pyevent
def test_contains(self):
prox = saranwrap.wrap({'a':'b'})
self.assert_('a' in prox)
self.assert_('x' not in prox)
@skip_on_windows
@skip_with_pyevent
def test_variable_and_keyword_arguments_with_function_calls(self):
import optparse
prox = saranwrap.wrap(optparse)
parser = prox.OptionParser()
z = parser.add_option('-n', action='store', type='string', dest='n')
opts,args = parser.parse_args(["-nfoo"])
self.assertEqual(opts.n, 'foo')
@skip_on_windows
@skip_with_pyevent
def test_original_proxy_going_out_of_scope(self):
def make_re():
prox = saranwrap.wrap(re)
# after this function returns, prox should fall out of scope
return prox.compile('.')
tid = make_re()
self.assertEqual(tid.flags, 0)
def make_list():
from tests import saranwrap_test
prox = saranwrap.wrap(saranwrap_test.list_maker)
# after this function returns, prox should fall out of scope
return prox()
proxl = make_list()
self.assertEqual(proxl[2], 2)
def test_status_of_none(self):
try:
saranwrap.status(None)
self.assert_(False)
except AttributeError, e:
pass
@skip_on_windows
@skip_with_pyevent
def test_not_inheriting_pythonpath(self):
# construct a fake module in the temp directory
temp_dir = tempfile.mkdtemp("saranwrap_test")
fp = open(os.path.join(temp_dir, "tempmod.py"), "w")
fp.write("""import os, sys
pypath = os.environ['PYTHONPATH']
sys_path = sys.path""")
fp.close()
# this should fail because we haven't stuck the temp_dir in our path yet
prox = saranwrap.wrap_module('tempmod')
try:
prox.pypath
self.fail()
except ImportError:
pass
# now try to saranwrap it
sys.path.append(temp_dir)
try:
import tempmod
prox = saranwrap.wrap(tempmod)
self.assert_(prox.pypath.count(temp_dir))
self.assert_(prox.sys_path.count(temp_dir))
finally:
import shutil
shutil.rmtree(temp_dir)
sys.path.remove(temp_dir)
@skip_on_windows
@skip_with_pyevent
def test_contention(self):
from tests import saranwrap_test
prox = saranwrap.wrap(saranwrap_test)
pool = greenpool.GreenPool(4)
pool.spawn_n(lambda: self.assertEquals(prox.one, 1))
pool.spawn_n(lambda: self.assertEquals(prox.two, 2))
pool.spawn_n(lambda: self.assertEquals(prox.three, 3))
pool.waitall()
@skip_on_windows
@skip_with_pyevent
def test_copy(self):
import copy
compound_object = {'a':[1,2,3]}
prox = saranwrap.wrap(compound_object)
def make_assertions(copied):
self.assert_(isinstance(copied, dict))
self.assert_(isinstance(copied['a'], list))
self.assertEquals(copied, compound_object)
self.assertNotEqual(id(compound_object), id(copied))
make_assertions(copy.copy(prox))
make_assertions(copy.deepcopy(prox))
@skip_on_windows
@skip_with_pyevent
def test_list_of_functions(self):
return # this test is known to fail, we can implement it sometime in the future if we wish
from tests import saranwrap_test
prox = saranwrap.wrap([saranwrap_test.list_maker])
self.assertEquals(list_maker(), prox[0]())
@skip_on_windows
@skip_with_pyevent
def test_under_the_hood_coroutines(self):
# so, we want to write a class which uses a coroutine to call
# a function. Then we want to saranwrap that class, have
# the object call the coroutine and verify that it ran
from tests import saranwrap_test
mod_proxy = saranwrap.wrap(saranwrap_test)
obj_proxy = mod_proxy.CoroutineCallingClass()
obj_proxy.run_coroutine()
# sleep for a bit to make sure out coroutine ran by the time
# we check the assert below
api.sleep(0.1)
self.assert_(
'random' in obj_proxy.get_dict(),
'Coroutine in saranwrapped object did not run')
@skip_on_windows
@skip_with_pyevent
def test_child_process_death(self):
prox = saranwrap.wrap({})
pid = saranwrap.getpid(prox)
self.assertEqual(os.kill(pid, 0), None) # assert that the process is running
del prox # removing all references to the proxy should kill the child process
api.sleep(0.1) # need to let the signal handler run
self.assertRaises(OSError, os.kill, pid, 0) # raises OSError if pid doesn't exist
@skip_on_windows
@skip_with_pyevent
def test_detection_of_server_crash(self):
# make the server crash here
pass
@skip_on_windows
@skip_with_pyevent
def test_equality_with_local_object(self):
# we'll implement this if there's a use case for it
pass
@skip_on_windows
@skip_with_pyevent
def test_non_blocking(self):
# here we test whether it's nonblocking
pass
if __name__ == '__main__':
main()