diff --git a/tests/db_pool_test.py b/tests/db_pool_test.py index 0d4d64e..2a77311 100644 --- a/tests/db_pool_test.py +++ b/tests/db_pool_test.py @@ -21,12 +21,13 @@ # LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, # OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN # THE SOFTWARE. -from tests import skipped +from tests import skipped, skip_unless_requirement from unittest import TestCase, main from eventlet import api, coros from eventlet import db_pool class DBTester(object): + __test__ = False # so that nose doesn't try to execute this directly def setUp(self): self.create_db() self.connection = None @@ -77,6 +78,7 @@ class Mock(object): class TestDBConnectionPool(DBTester): + __test__ = False # so that nose doesn't try to execute this directly def setUp(self): super(TestDBConnectionPool, self).setUp() self.pool = self.create_pool() @@ -444,6 +446,7 @@ class RaisingDBModule(object): class TestTpoolConnectionPool(TestDBConnectionPool): + __test__ = False # so that nose doesn't try to execute this directly def create_pool(self, max_size = 1, max_idle = 10, max_age = 10, connect_timeout=0.5, module=None): if module is None: module = self._dbmodule @@ -466,6 +469,7 @@ class TestTpoolConnectionPool(TestDBConnectionPool): class TestSaranwrapConnectionPool(TestDBConnectionPool): + __test__ = False # so that nose doesn't try to execute this directly def create_pool(self, max_size = 1, max_idle = 10, max_age = 10, connect_timeout= 0.5, module=None): if module is None: module = self._dbmodule @@ -482,6 +486,7 @@ class TestSaranwrapConnectionPool(TestDBConnectionPool): class TestRawConnectionPool(TestDBConnectionPool): + __test__ = False # so that nose doesn't try to execute this directly def create_pool(self, max_size = 1, max_idle = 10, max_age = 10, connect_timeout= 0.5, module=None): if module is None: module = self._dbmodule @@ -495,19 +500,37 @@ class TestRawConnectionPool(TestDBConnectionPool): pass # not gonna work for raw connections because they're not nonblocking +def get_auth(): + try: + import simplejson + import os.path + auth_utf8 = simplejson.load(open(os.path.join(os.path.dirname(__file__), 'auth.json'))) + # have to convert unicode objects to str objects because mysqldb is dum + return dict([(str(k), str(v)) + for k, v in auth_utf8.items()]) + except (IOError, ImportError), e: + return {'host': 'localhost','user': 'root','passwd': '','db': 'persist0'} + + +def mysql_requirement(_f): + try: + import MySQLdb + try: + MySQLdb.connect(**get_auth()) + return True + except MySQLdb.OperationalError: + return False + except ImportError: + return False + class TestMysqlConnectionPool(object): + __test__ = True + + @skip_unless_requirement(mysql_requirement) def setUp(self): import MySQLdb self._dbmodule = MySQLdb - try: - import simplejson - import os.path - auth_utf8 = simplejson.load(open(os.path.join(os.path.dirname(__file__), 'auth.json'))) - # have to convert unicode objects to str objects because mysqldb is dum - self._auth = dict([(str(k), str(v)) - for k, v in auth_utf8.items()]) - except (IOError, ImportError), e: - self._auth = {'host': 'localhost','user': 'root','passwd': '','db': 'persist0'} + self._auth = get_auth() super(TestMysqlConnectionPool, self).setUp() def tearDown(self): @@ -543,12 +566,6 @@ class Test03MysqlRaw(TestMysqlConnectionPool, TestRawConnectionPool, TestCase): pass + if __name__ == '__main__': - try: - import MySQLdb - except ImportError: - print "Unable to import MySQLdb, skipping db_pool_test." - else: - main() -else: - import MySQLdb + main() diff --git a/tests/stdlib/test_threading.py b/tests/stdlib/test_threading.py new file mode 100644 index 0000000..5fcc4b4 --- /dev/null +++ b/tests/stdlib/test_threading.py @@ -0,0 +1,20 @@ +# Very rudimentary test of threading module + +from eventlet.green import threading +from eventlet.green import thread +from eventlet.green import time + +# need to override these modules before import so +# that classes inheriting from threading.Thread refer +# to the correct parent class +import sys +sys.modules['threading'] = threading + +from test import test_threading +test_threading.thread = thread +test_threading.time = time + +from test.test_threading import * + +if __name__ == "__main__": + test_main() \ No newline at end of file diff --git a/tests/test_threading.py b/tests/test_threading.py deleted file mode 100644 index 22b856c..0000000 --- a/tests/test_threading.py +++ /dev/null @@ -1,306 +0,0 @@ -# Very rudimentary test of threading module - -import tests.test_support -from tests.test_support import verbose -import random -import sys -from eventlet.green import threading -from eventlet.green import thread -from eventlet.green import time -import unittest - -# A trivial mutable counter. -class Counter(object): - def __init__(self): - self.value = 0 - def inc(self): - self.value += 1 - def dec(self): - self.value -= 1 - def get(self): - return self.value - -class TestThread(threading.Thread): - def __init__(self, name, testcase, sema, mutex, nrunning): - threading.Thread.__init__(self, name=name) - self.testcase = testcase - self.sema = sema - self.mutex = mutex - self.nrunning = nrunning - - def run(self): - delay = random.random() * 0.1 - if verbose: - print 'task', self.getName(), 'will run for', delay, 'sec' - - self.sema.acquire() - - self.mutex.acquire() - self.nrunning.inc() - if verbose: - print self.nrunning.get(), 'tasks are running' - self.testcase.assert_(self.nrunning.get() <= 3) - self.mutex.release() - - time.sleep(delay) - if verbose: - print 'task', self.getName(), 'done' - - self.mutex.acquire() - self.nrunning.dec() - self.testcase.assert_(self.nrunning.get() >= 0) - if verbose: - print self.getName(), 'is finished.', self.nrunning.get(), \ - 'tasks are running' - self.mutex.release() - - self.sema.release() - -class ThreadTests(unittest.TestCase): - - # Create a bunch of threads, let each do some work, wait until all are - # done. - def test_various_ops(self): - # This takes about n/3 seconds to run (about n/3 clumps of tasks, - # times about 1 second per clump). - NUMTASKS = 10 - - # no more than 3 of the 10 can run at once - sema = threading.BoundedSemaphore(value=3) - mutex = threading.RLock() - numrunning = Counter() - - threads = [] - - for i in range(NUMTASKS): - t = TestThread(""%i, self, sema, mutex, numrunning) - threads.append(t) - t.start() - - if verbose: - print 'waiting for all tasks to complete' - for t in threads: - t.join(NUMTASKS) - self.assert_(not t.isAlive()) - if verbose: - print 'all tasks done' - self.assertEqual(numrunning.get(), 0) - - # run with a small(ish) thread stack size (256kB) - if hasattr(threading, 'stack_size'): - def test_various_ops_small_stack(self): - if verbose: - print 'with 256kB thread stack size...' - try: - threading.stack_size(262144) - except thread.error: - if verbose: - print 'platform does not support changing thread stack size' - return - self.test_various_ops() - threading.stack_size(0) - - # run with a large thread stack size (1MB) - def test_various_ops_large_stack(self): - if verbose: - print 'with 1MB thread stack size...' - try: - threading.stack_size(0x100000) - except thread.error: - if verbose: - print 'platform does not support changing thread stack size' - return - self.test_various_ops() - threading.stack_size(0) - - def test_foreign_thread(self): - # Check that a "foreign" thread can use the threading module. - def f(mutex): - # Acquiring an RLock forces an entry for the foreign - # thread to get made in the threading._active map. - r = threading.RLock() - r.acquire() - r.release() - mutex.release() - - mutex = threading.Lock() - mutex.acquire() - tid = thread.start_new_thread(f, (mutex,)) - # Wait for the thread to finish. - mutex.acquire() - self.assert_(tid in threading._active) - self.assert_(isinstance(threading._active[tid], - threading._DummyThread)) - del threading._active[tid] - - # PyThreadState_SetAsyncExc() is a CPython-only gimmick, not (currently) - # exposed at the Python level. This test relies on ctypes to get at it. - def test_PyThreadState_SetAsyncExc(self): - try: - import ctypes - except ImportError: - if verbose: - print "test_PyThreadState_SetAsyncExc can't import ctypes" - return # can't do anything - - set_async_exc = ctypes.pythonapi.PyThreadState_SetAsyncExc - - class AsyncExc(Exception): - pass - - exception = ctypes.py_object(AsyncExc) - - # `worker_started` is set by the thread when it's inside a try/except - # block waiting to catch the asynchronously set AsyncExc exception. - # `worker_saw_exception` is set by the thread upon catching that - # exception. - worker_started = threading.Event() - worker_saw_exception = threading.Event() - - class Worker(threading.Thread): - def run(self): - self.id = thread.get_ident() - self.finished = False - - try: - while True: - worker_started.set() - time.sleep(0.1) - except AsyncExc: - self.finished = True - worker_saw_exception.set() - - t = Worker() - t.setDaemon(True) # so if this fails, we don't hang Python at shutdown - t.start() - if verbose: - print " started worker thread" - - # Try a thread id that doesn't make sense. - if verbose: - print " trying nonsensical thread id" - result = set_async_exc(ctypes.c_long(-1), exception) - self.assertEqual(result, 0) # no thread states modified - - # Now raise an exception in the worker thread. - if verbose: - print " waiting for worker thread to get started" - worker_started.wait() - if verbose: - print " verifying worker hasn't exited" - self.assert_(not t.finished) -# if verbose: -# print " attempting to raise asynch exception in worker" -# result = set_async_exc(ctypes.c_long(t.id), exception) -# self.assertEqual(result, 1) # one thread state modified -# if verbose: -# print " waiting for worker to say it caught the exception" -# worker_saw_exception.wait(timeout=10) -# self.assert_(t.finished) - if verbose: - print " all OK(2 disabled) -- joining worker" - if t.finished: - t.join() - # else the thread is still running, and we have no way to kill it - - def test_enumerate_after_join(self): - # Try hard to trigger #1703448: a thread is still returned in - # threading.enumerate() after it has been join()ed. - enum = threading.enumerate - old_interval = sys.getcheckinterval() - sys.setcheckinterval(1) - try: - for i in xrange(1, 1000): - t = threading.Thread(target=lambda: None) - t.start() - t.join() - l = enum() - self.assertFalse(t in l, - "#1703448 triggered after %d trials: %s" % (i, l)) - finally: - sys.setcheckinterval(old_interval) - - -class ThreadJoinOnShutdown(unittest.TestCase): - - def _run_and_join(self, script): - script = """if 1: - import sys, os, time, threading - - # a thread, which waits for the main program to terminate - def joiningfunc(mainthread): - mainthread.join() - print 'end of thread' - \n""" + script - - import subprocess - p = subprocess.Popen([sys.executable, "-c", script], stdout=subprocess.PIPE) - rc = p.wait() - data = p.stdout.read().replace('\r', '') - self.assertEqual(data, "end of main\nend of thread\n") - self.failIf(rc == 2, "interpreter was blocked") - self.failUnless(rc == 0, "Unexpected error") - - def test_1_join_on_shutdown(self): - # The usual case: on exit, wait for a non-daemon thread - script = """if 1: - import os - t = threading.Thread(target=joiningfunc, - args=(threading.currentThread(),)) - t.start() - time.sleep(0.1) - print 'end of main' - """ - self._run_and_join(script) - - - def test_2_join_in_forked_process(self): - # Like the test above, but from a forked interpreter - import os - if not hasattr(os, 'fork'): - return - script = """if 1: - childpid = os.fork() - if childpid != 0: - os.waitpid(childpid, 0) - sys.exit(0) - - t = threading.Thread(target=joiningfunc, - args=(threading.currentThread(),)) - t.start() - print 'end of main' - """ - self._run_and_join(script) - - def test_3_join_in_forked_from_thread(self): - # Like the test above, but fork() was called from a worker thread - # In the forked process, the main Thread object must be marked as stopped. - import os - if not hasattr(os, 'fork'): - return - script = """if 1: - main_thread = threading.currentThread() - def worker(): - childpid = os.fork() - if childpid != 0: - os.waitpid(childpid, 0) - sys.exit(0) - - t = threading.Thread(target=joiningfunc, - args=(main_thread,)) - print 'end of main' - t.start() - t.join() # Should not block: main_thread is already stopped - - w = threading.Thread(target=worker) - w.start() - """ - self._run_and_join(script) - - -def test_main(): - tests.test_support.run_unittest(ThreadTests, - ThreadJoinOnShutdown) - -if __name__ == "__main__": - test_main() diff --git a/tests/tpool_test.py b/tests/tpool_test.py index 672b040..35fc2a6 100644 --- a/tests/tpool_test.py +++ b/tests/tpool_test.py @@ -13,10 +13,11 @@ # See the License for the specific language governing permissions and # limitations under the License. +import random from sys import stdout import time +from tests import skipped from unittest import TestCase, main -import random import uuid from eventlet import coros, api, tpool @@ -183,8 +184,9 @@ class TestTpool(TestCase): tpool.execute, time.sleep, 0.3) - def dont_test_benchmark(self): - """ Benchmark computing the amount of overhead tpool adds to function calls. Rename to activate.""" + @skipped + def test_benchmark(self): + """ Benchmark computing the amount of overhead tpool adds to function calls.""" iterations = 10000 def bench(f, *args, **kw): for i in xrange(iterations):