# Copyright (c) 2007, Linden Research, Inc. # Copyright (c) 2007, IBM Corp. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. from __future__ import print_function import gc import random import re import time import eventlet from eventlet import tpool from eventlet.support import six import tests one = 1 two = 2 three = 3 none = None def noop(): pass def raise_exception(): raise RuntimeError("hi") class TestTpool(tests.LimitedTestCase): def setUp(self): super(TestTpool, self).setUp() def tearDown(self): tpool.killall() super(TestTpool, self).tearDown() @tests.skip_with_pyevent def test_wrap_tuple(self): my_tuple = (1, 2) prox = tpool.Proxy(my_tuple) self.assertEqual(prox[0], 1) self.assertEqual(prox[1], 2) self.assertEqual(len(my_tuple), 2) @tests.skip_with_pyevent def test_wrap_string(self): my_object = "whatever" prox = tpool.Proxy(my_object) self.assertEqual(str(my_object), str(prox)) self.assertEqual(len(my_object), len(prox)) self.assertEqual(my_object.join(['a', 'b']), prox.join(['a', 'b'])) @tests.skip_with_pyevent def test_wrap_uniterable(self): prox = tpool.Proxy([]) def index(): prox[0] def key(): prox['a'] self.assertRaises(IndexError, index) self.assertRaises(TypeError, key) @tests.skip_with_pyevent def test_wrap_dict(self): my_object = {'a': 1} prox = tpool.Proxy(my_object) self.assertEqual('a', list(prox.keys())[0]) self.assertEqual(1, prox['a']) self.assertEqual(str(my_object), str(prox)) self.assertEqual(repr(my_object), repr(prox)) @tests.skip_with_pyevent def test_wrap_module_class(self): prox = tpool.Proxy(re) self.assertEqual(tpool.Proxy, type(prox)) exp = prox.compile('(.)(.)(.)') self.assertEqual(exp.groups, 3) assert repr(prox.compile) @tests.skip_with_pyevent def test_wrap_eq(self): prox = tpool.Proxy(re) exp1 = prox.compile('.') exp2 = prox.compile(exp1.pattern) self.assertEqual(exp1, exp2) exp3 = prox.compile('/') assert exp1 != exp3 @tests.skip_with_pyevent def test_wrap_ints(self): p = tpool.Proxy(4) assert p == 4 @tests.skip_with_pyevent def test_wrap_hash(self): prox1 = tpool.Proxy('' + 'A') prox2 = tpool.Proxy('A' + '') assert prox1 == 'A' assert 'A' == prox2 # assert prox1 == prox2 FIXME - could __eq__ unwrap rhs if it is other proxy? self.assertEqual(hash(prox1), hash(prox2)) proxList = tpool.Proxy([]) self.assertRaises(TypeError, hash, proxList) @tests.skip_with_pyevent def test_wrap_nonzero(self): prox = tpool.Proxy(re) exp1 = prox.compile('.') assert bool(exp1) prox2 = tpool.Proxy([1, 2, 3]) assert bool(prox2) @tests.skip_with_pyevent def test_multiple_wraps(self): prox1 = tpool.Proxy(re) prox2 = tpool.Proxy(re) prox1.compile('.') x2 = prox1.compile('.') del x2 prox2.compile('.') @tests.skip_with_pyevent def test_wrap_getitem(self): prox = tpool.Proxy([0, 1, 2]) self.assertEqual(prox[0], 0) @tests.skip_with_pyevent def test_wrap_setitem(self): prox = tpool.Proxy([0, 1, 2]) prox[1] = 2 self.assertEqual(prox[1], 2) @tests.skip_with_pyevent def test_wrap_iterator(self): self.reset_timeout(2) prox = tpool.Proxy(range(10)) result = [] for i in prox: result.append(i) self.assertEqual(list(range(10)), result) @tests.skip_with_pyevent def test_wrap_iterator2(self): self.reset_timeout(5) # might take a while due to imprecise sleeping def foo(): import time for x in range(2): yield x time.sleep(0.001) counter = [0] def tick(): for i in six.moves.range(20000): counter[0] += 1 if counter[0] % 20 == 0: eventlet.sleep(0.0001) else: eventlet.sleep() gt = eventlet.spawn(tick) previtem = 0 for item in tpool.Proxy(foo()): assert item >= previtem # make sure the tick happened at least a few times so that we know # that our iterations in foo() were actually tpooled assert counter[0] > 10, counter[0] gt.kill() @tests.skip_with_pyevent def test_raising_exceptions(self): prox = tpool.Proxy(re) def nofunc(): prox.never_name_a_function_like_this() self.assertRaises(AttributeError, nofunc) from tests import tpool_test prox = tpool.Proxy(tpool_test) self.assertRaises(RuntimeError, prox.raise_exception) @tests.skip_with_pyevent def test_variable_and_keyword_arguments_with_function_calls(self): import optparse parser = tpool.Proxy(optparse.OptionParser()) parser.add_option('-n', action='store', type='string', dest='n') opts, args = parser.parse_args(["-nfoo"]) self.assertEqual(opts.n, 'foo') @tests.skip_with_pyevent def test_contention(self): from tests import tpool_test prox = tpool.Proxy(tpool_test) pile = eventlet.GreenPile(4) pile.spawn(lambda: self.assertEqual(prox.one, 1)) pile.spawn(lambda: self.assertEqual(prox.two, 2)) pile.spawn(lambda: self.assertEqual(prox.three, 3)) results = list(pile) self.assertEqual(len(results), 3) @tests.skip_with_pyevent def test_timeout(self): blocking = eventlet.patcher.original('time') eventlet.Timeout(0.1, eventlet.Timeout()) try: tpool.execute(blocking.sleep, 0.3) assert False, 'Expected Timeout' except eventlet.Timeout: pass @tests.skip_with_pyevent def test_killall(self): tpool.killall() tpool.setup() @tests.skip_with_pyevent def test_killall_remaining_results(self): semaphore = eventlet.Event() def native_fun(): time.sleep(.5) def gt_fun(): semaphore.send(None) tpool.execute(native_fun) gt = eventlet.spawn(gt_fun) semaphore.wait() tpool.killall() gt.wait() @tests.skip_with_pyevent def test_autowrap(self): x = tpool.Proxy({'a': 1, 'b': 2}, autowrap=(int,)) assert isinstance(x.get('a'), tpool.Proxy) assert not isinstance(x.items(), tpool.Proxy) # attributes as well as callables from tests import tpool_test x = tpool.Proxy(tpool_test, autowrap=(int,)) assert isinstance(x.one, tpool.Proxy) assert not isinstance(x.none, tpool.Proxy) @tests.skip_with_pyevent def test_autowrap_names(self): x = tpool.Proxy({'a': 1, 'b': 2}, autowrap_names=('get',)) assert isinstance(x.get('a'), tpool.Proxy) assert not isinstance(x.items(), tpool.Proxy) from tests import tpool_test x = tpool.Proxy(tpool_test, autowrap_names=('one',)) assert isinstance(x.one, tpool.Proxy) assert not isinstance(x.two, tpool.Proxy) @tests.skip_with_pyevent def test_autowrap_both(self): from tests import tpool_test x = tpool.Proxy(tpool_test, autowrap=(int,), autowrap_names=('one',)) assert isinstance(x.one, tpool.Proxy) # violating the abstraction to check that we didn't double-wrap assert not isinstance(x._obj, tpool.Proxy) @tests.skip_with_pyevent def test_callable(self): def wrapped(arg): return arg x = tpool.Proxy(wrapped) self.assertEqual(4, x(4)) # verify that it wraps return values if specified x = tpool.Proxy(wrapped, autowrap_names=('__call__',)) assert isinstance(x(4), tpool.Proxy) self.assertEqual("4", str(x(4))) @tests.skip_with_pyevent def test_callable_iterator(self): def wrapped(arg): yield arg yield arg yield arg x = tpool.Proxy(wrapped, autowrap_names=('__call__',)) for r in x(3): self.assertEqual(3, r) @tests.skip_with_pyevent def test_eventlet_timeout(self): def raise_timeout(): raise eventlet.Timeout() self.assertRaises(eventlet.Timeout, tpool.execute, raise_timeout) @tests.skip_with_pyevent def test_tpool_set_num_threads(self): tpool.set_num_threads(5) self.assertEqual(5, tpool._nthreads) class TpoolLongTests(tests.LimitedTestCase): TEST_TIMEOUT = 60 @tests.skip_with_pyevent def test_a_buncha_stuff(self): assert_ = self.assert_ class Dummy(object): def foo(self, when, token=None): assert_(token is not None) time.sleep(random.random() / 200.0) return token def sender_loop(loopnum): obj = tpool.Proxy(Dummy()) count = 100 for n in six.moves.range(count): eventlet.sleep(random.random() / 200.0) now = time.time() token = loopnum * count + n rv = obj.foo(now, token=token) self.assertEqual(token, rv) eventlet.sleep(random.random() / 200.0) cnt = 10 pile = eventlet.GreenPile(cnt) for i in six.moves.range(cnt): pile.spawn(sender_loop, i) results = list(pile) self.assertEqual(len(results), cnt) tpool.killall() @tests.skip_with_pyevent def test_leakage_from_tracebacks(self): tpool.execute(noop) # get it started gc.collect() initial_objs = len(gc.get_objects()) for i in range(10): self.assertRaises(RuntimeError, tpool.execute, raise_exception) gc.collect() middle_objs = len(gc.get_objects()) # some objects will inevitably be created by the previous loop # now we test to ensure that running the loop an order of # magnitude more doesn't generate additional objects for i in six.moves.range(100): self.assertRaises(RuntimeError, tpool.execute, raise_exception) first_created = middle_objs - initial_objs gc.collect() second_created = len(gc.get_objects()) - middle_objs self.assert_(second_created - first_created < 10, "first loop: %s, second loop: %s" % (first_created, second_created)) tpool.killall() def test_isolate_from_socket_default_timeout(): tests.run_isolated('tpool_isolate_socket_default_timeout.py', timeout=1)