From f26d49ccf2967fe63078d18f987a41dbf22456bd Mon Sep 17 00:00:00 2001 From: Denis Bilenko Date: Tue, 19 May 2009 20:22:25 +0700 Subject: [PATCH] import db_pool_test.py from https://bitbucket.org/which_linden/eventlet/ --- greentest/db_pool_test.py | 250 ++++++++++++++++++++++++++++++++++++-- 1 file changed, 238 insertions(+), 12 deletions(-) diff --git a/greentest/db_pool_test.py b/greentest/db_pool_test.py index 981febf..671ac27 100644 --- a/greentest/db_pool_test.py +++ b/greentest/db_pool_test.py @@ -42,15 +42,18 @@ class DBTester(object): cursor.close() def tearDown(self): - if self.connection is not None: + if self.connection: self.connection.close() self.drop_db() def set_up_test_table(self, connection = None): + close_connection = False if connection is None: + close_connection = True if self.connection is None: - self.connection = self._dbmodule.connect(**self._auth) - connection = self.connection + connection = self._dbmodule.connect(**self._auth) + else: + connection = self.connection cursor = connection.cursor() cursor.execute("""CREATE TEMPORARY TABLE test_table @@ -67,6 +70,13 @@ class DBTester(object): ) ENGINE = InnoDB;""") connection.commit() cursor.close() + if close_connection: + connection.close() + +# silly mock class +class Mock(object): + pass + class TestDBConnectionPool(DBTester): def setUp(self): @@ -75,10 +85,12 @@ class TestDBConnectionPool(DBTester): self.connection = self.pool.get() def tearDown(self): - self.pool.put(self.connection) + if self.connection: + self.pool.put(self.connection) super(TestDBConnectionPool, self).tearDown() def assert_cursor_works(self, cursor): + # TODO: this is pretty mysql-specific cursor.execute("show full processlist") rows = cursor.fetchall() self.assert_(rows) @@ -185,7 +197,6 @@ class TestDBConnectionPool(DBTester): self.assertEqual(10, rows) def test_visibility_from_other_connections(self): - # *FIX: use some non-indra-specific table for testing (can't use a temp table) self.pool = self.create_pool(3) conn = self.pool.get() conn2 = self.pool.get() @@ -215,6 +226,7 @@ class TestDBConnectionPool(DBTester): def test_two_simultaneous_connections(self): + """ This test is timing-sensitive. """ self.pool = self.create_pool(2) conn = self.pool.get() self.set_up_test_table(conn) @@ -245,19 +257,225 @@ class TestDBConnectionPool(DBTester): api.spawn(short_running_query) evt.wait() evt2.wait() - #print "results %s" % results results.sort() self.assertEqual([1, 2], results) + + def test_clear(self): + self.pool = self.create_pool() + self.pool.put(self.connection) + self.pool.clear() + self.assertEqual(len(self.pool.free_items), 0) + + def test_unwrap_connection(self): + self.assert_(isinstance(self.connection, + db_pool.GenericConnectionWrapper)) + conn = self.pool._unwrap_connection(self.connection) + self.assert_(not isinstance(conn, db_pool.GenericConnectionWrapper)) + + self.assertEquals(None, self.pool._unwrap_connection(None)) + self.assertEquals(None, self.pool._unwrap_connection(1)) + + # testing duck typing here -- as long as the connection has a + # _base attribute, it should be unwrappable + x = Mock() + x._base = 'hi' + self.assertEquals('hi', self.pool._unwrap_connection(x)) + + def test_safe_close(self): + self.pool._safe_close(self.connection, quiet=True) + self.assertEquals(len(self.pool.free_items), 1) + + self.pool._safe_close(None) + self.pool._safe_close(1) + + # now we're really going for 100% coverage + x = Mock() + def fail(): + raise KeyboardInterrupt() + x.close = fail + self.assertRaises(KeyboardInterrupt, self.pool._safe_close, x) + x = Mock() + def fail2(): + raise RuntimeError("if this line has been printed, the test succeeded") + x.close = fail2 + self.pool._safe_close(x, quiet=False) + + def test_zero_max_idle(self): + self.pool = self.create_pool(max_size=2, max_idle=0) + self.connection = self.pool.get() + self.connection.close() + self.assertEquals(len(self.pool.free_items), 0) + + def test_zero_max_age(self): + self.pool = self.create_pool(max_size=2, max_age=0) + self.connection = self.pool.get() + self.connection.close() + self.assertEquals(len(self.pool.free_items), 0) + + def test_max_idle(self): + # This test is timing-sensitive. + self.pool = self.create_pool(max_size=2, max_idle=0.02) + self.connection = self.pool.get() + self.connection.close() + self.assertEquals(len(self.pool.free_items), 1) + api.sleep(0.01) # not long enough to trigger the idle timeout + self.assertEquals(len(self.pool.free_items), 1) + self.connection = self.pool.get() + self.connection.close() + self.assertEquals(len(self.pool.free_items), 1) + api.sleep(0.01) # idle timeout should have fired but done nothing + self.assertEquals(len(self.pool.free_items), 1) + self.connection = self.pool.get() + self.connection.close() + self.assertEquals(len(self.pool.free_items), 1) + api.sleep(0.03) # long enough to trigger idle timeout for real + self.assertEquals(len(self.pool.free_items), 0) + + def test_max_idle_many(self): + # This test is timing-sensitive. + self.pool = self.create_pool(max_size=2, max_idle=0.02) + self.connection, conn2 = self.pool.get(), self.pool.get() + self.connection.close() + api.sleep(0.01) + self.assertEquals(len(self.pool.free_items), 1) + conn2.close() + self.assertEquals(len(self.pool.free_items), 2) + api.sleep(0.02) # trigger cleanup of conn1 but not conn2 + self.assertEquals(len(self.pool.free_items), 1) + + def test_max_age(self): + # This test is timing-sensitive. + self.pool = self.create_pool(max_size=2, max_age=0.05) + self.connection = self.pool.get() + self.connection.close() + self.assertEquals(len(self.pool.free_items), 1) + api.sleep(0.01) # not long enough to trigger the age timeout + self.assertEquals(len(self.pool.free_items), 1) + self.connection = self.pool.get() + self.connection.close() + self.assertEquals(len(self.pool.free_items), 1) + api.sleep(0.05) # long enough to trigger age timeout + self.assertEquals(len(self.pool.free_items), 0) + + def test_max_age_many(self): + # This test is timing-sensitive. + self.pool = self.create_pool(max_size=2, max_age=0.15) + self.connection, conn2 = self.pool.get(), self.pool.get() + self.connection.close() + self.assertEquals(len(self.pool.free_items), 1) + api.sleep(0) # not long enough to trigger the age timeout + self.assertEquals(len(self.pool.free_items), 1) + api.sleep(0.2) # long enough to trigger age timeout + self.assertEquals(len(self.pool.free_items), 0) + conn2.close() # should not be added to the free items + self.assertEquals(len(self.pool.free_items), 0) + + def test_connection_timeout(self): + # use a nonexistent ip address -- this one is reserved by IANA + self._auth['host'] = '192.0.2.1' + pool = self.create_pool() + self.assertRaises(db_pool.ConnectTimeout, pool.get) + + def test_waiters_get_woken(self): + # verify that when there's someone waiting on an empty pool + # and someone puts an immediately-closed connection back in + # the pool that the waiter gets woken + self.pool = self.create_pool(max_size=1, max_age=0) + + conn = self.pool.get() + self.assertEquals(self.pool.free(), 0) + self.assertEquals(self.pool.waiting(), 0) + e = coros.event() + def retrieve(pool, ev): + c = pool.get() + ev.send(c) + api.spawn(retrieve, self.pool, e) + api.sleep(0) # these two sleeps should advance the retrieve + api.sleep(0) # coroutine until it's waiting in get() + self.assertEquals(self.pool.free(), 0) + self.assertEquals(self.pool.waiting(), 1) + self.pool.put(conn) + timer = api.exc_after(0.3, api.TimeoutError) + conn = e.wait() + timer.cancel() + self.assertEquals(self.pool.free(), 0) + self.assertEquals(self.pool.waiting(), 0) + + def dont_test_0_straight_benchmark(self): + """ Benchmark; don't run unless you want to wait a while.""" + import time + iterations = 20000 + c = self.connection.cursor() + self.connection.commit() + def bench(c): + for i in xrange(iterations): + c.execute('select 1') + + bench(c) # warm-up + results = [] + for i in xrange(3): + start = time.time() + bench(c) + end = time.time() + results.append(end-start) + + print "\n%u iterations took an average of %f seconds, (%s) in %s\n" % ( + iterations, sum(results)/len(results), results, type(self)) + + def test_raising_create(self): + # if the create() method raises an exception the pool should + # not lose any connections + self.pool = self.create_pool(max_size=1, module=RaisingDBModule()) + self.assertRaises(RuntimeError, self.pool.get) + self.assertEquals(self.pool.free(), 1) + + +class RaisingDBModule(object): + def connect(self, *args, **kw): + raise RuntimeError() + class TestTpoolConnectionPool(TestDBConnectionPool): - def create_pool(self, max_items = 1): - return db_pool.TpooledConnectionPool(self._dbmodule, 0, max_items, **self._auth) + def create_pool(self, max_size = 1, max_idle = 10, max_age = 10, connect_timeout=0.5, module=None): + if module is None: + module = self._dbmodule + return db_pool.TpooledConnectionPool(module, + min_size=0, max_size=max_size, + max_idle=max_idle, max_age=max_age, + connect_timeout = connect_timeout, + **self._auth) class TestSaranwrapConnectionPool(TestDBConnectionPool): - def create_pool(self, max_items = 1): - return db_pool.SaranwrappedConnectionPool(self._dbmodule, 0, max_items, **self._auth) + def create_pool(self, max_size = 1, max_idle = 10, max_age = 10, connect_timeout= 0.5, module=None): + if module is None: + module = self._dbmodule + return db_pool.SaranwrappedConnectionPool(module, + min_size=0, max_size=max_size, + max_idle=max_idle, max_age=max_age, + connect_timeout=connect_timeout, + **self._auth) + + def test_raising_create(self): + # *TODO: this fails because of saranwrap's unwillingness to + # wrap objects in tests, but it should be fixable + pass + + +class TestRawConnectionPool(TestDBConnectionPool): + def create_pool(self, max_size = 1, max_idle = 10, max_age = 10, connect_timeout= 0.5, module=None): + if module is None: + module = self._dbmodule + return db_pool.RawConnectionPool(module, + min_size=0, max_size=max_size, + max_idle=max_idle, max_age=max_age, + connect_timeout=connect_timeout, + **self._auth) + + def test_connection_timeout(self): + pass # not gonna work for raw connections because they're not nonblocking + class TestMysqlConnectionPool(object): def setUp(self): @@ -273,6 +491,9 @@ class TestMysqlConnectionPool(object): except (IOError, ImportError), e: self._auth = {'host': 'localhost','user': 'root','passwd': '','db': 'persist0'} super(TestMysqlConnectionPool, self).setUp() + + def tearDown(self): + pass def create_db(self): auth = self._auth.copy() @@ -292,10 +513,15 @@ class TestMysqlConnectionPool(object): db.close() del db -class TestMysqlTpool(TestMysqlConnectionPool, TestTpoolConnectionPool, tests.TestCase): + +# for some reason the tpool test hangs if run after the saranwrap test +class Test01MysqlTpool(TestMysqlConnectionPool, TestTpoolConnectionPool, tests.TestCase): pass -class TestMysqlSaranwrap(TestMysqlConnectionPool, TestSaranwrapConnectionPool, tests.TestCase): +class Test02MysqlSaranwrap(TestMysqlConnectionPool, TestSaranwrapConnectionPool, tests.TestCase): + pass + +class Test03MysqlRaw(TestMysqlConnectionPool, TestRawConnectionPool, tests.TestCase): pass