import db_pool_test.py from https://bitbucket.org/which_linden/eventlet/
This commit is contained in:
@@ -42,15 +42,18 @@ class DBTester(object):
|
||||
cursor.close()
|
||||
|
||||
def tearDown(self):
|
||||
if self.connection is not None:
|
||||
if self.connection:
|
||||
self.connection.close()
|
||||
self.drop_db()
|
||||
|
||||
def set_up_test_table(self, connection = None):
|
||||
close_connection = False
|
||||
if connection is None:
|
||||
close_connection = True
|
||||
if self.connection is None:
|
||||
self.connection = self._dbmodule.connect(**self._auth)
|
||||
connection = self.connection
|
||||
connection = self._dbmodule.connect(**self._auth)
|
||||
else:
|
||||
connection = self.connection
|
||||
|
||||
cursor = connection.cursor()
|
||||
cursor.execute("""CREATE TEMPORARY TABLE test_table
|
||||
@@ -67,6 +70,13 @@ class DBTester(object):
|
||||
) ENGINE = InnoDB;""")
|
||||
connection.commit()
|
||||
cursor.close()
|
||||
if close_connection:
|
||||
connection.close()
|
||||
|
||||
# silly mock class
|
||||
class Mock(object):
|
||||
pass
|
||||
|
||||
|
||||
class TestDBConnectionPool(DBTester):
|
||||
def setUp(self):
|
||||
@@ -75,10 +85,12 @@ class TestDBConnectionPool(DBTester):
|
||||
self.connection = self.pool.get()
|
||||
|
||||
def tearDown(self):
|
||||
self.pool.put(self.connection)
|
||||
if self.connection:
|
||||
self.pool.put(self.connection)
|
||||
super(TestDBConnectionPool, self).tearDown()
|
||||
|
||||
def assert_cursor_works(self, cursor):
|
||||
# TODO: this is pretty mysql-specific
|
||||
cursor.execute("show full processlist")
|
||||
rows = cursor.fetchall()
|
||||
self.assert_(rows)
|
||||
@@ -185,7 +197,6 @@ class TestDBConnectionPool(DBTester):
|
||||
self.assertEqual(10, rows)
|
||||
|
||||
def test_visibility_from_other_connections(self):
|
||||
# *FIX: use some non-indra-specific table for testing (can't use a temp table)
|
||||
self.pool = self.create_pool(3)
|
||||
conn = self.pool.get()
|
||||
conn2 = self.pool.get()
|
||||
@@ -215,6 +226,7 @@ class TestDBConnectionPool(DBTester):
|
||||
|
||||
|
||||
def test_two_simultaneous_connections(self):
|
||||
""" This test is timing-sensitive. """
|
||||
self.pool = self.create_pool(2)
|
||||
conn = self.pool.get()
|
||||
self.set_up_test_table(conn)
|
||||
@@ -245,19 +257,225 @@ class TestDBConnectionPool(DBTester):
|
||||
api.spawn(short_running_query)
|
||||
evt.wait()
|
||||
evt2.wait()
|
||||
#print "results %s" % results
|
||||
results.sort()
|
||||
self.assertEqual([1, 2], results)
|
||||
|
||||
def test_clear(self):
|
||||
self.pool = self.create_pool()
|
||||
self.pool.put(self.connection)
|
||||
self.pool.clear()
|
||||
self.assertEqual(len(self.pool.free_items), 0)
|
||||
|
||||
def test_unwrap_connection(self):
|
||||
self.assert_(isinstance(self.connection,
|
||||
db_pool.GenericConnectionWrapper))
|
||||
conn = self.pool._unwrap_connection(self.connection)
|
||||
self.assert_(not isinstance(conn, db_pool.GenericConnectionWrapper))
|
||||
|
||||
self.assertEquals(None, self.pool._unwrap_connection(None))
|
||||
self.assertEquals(None, self.pool._unwrap_connection(1))
|
||||
|
||||
# testing duck typing here -- as long as the connection has a
|
||||
# _base attribute, it should be unwrappable
|
||||
x = Mock()
|
||||
x._base = 'hi'
|
||||
self.assertEquals('hi', self.pool._unwrap_connection(x))
|
||||
|
||||
def test_safe_close(self):
|
||||
self.pool._safe_close(self.connection, quiet=True)
|
||||
self.assertEquals(len(self.pool.free_items), 1)
|
||||
|
||||
self.pool._safe_close(None)
|
||||
self.pool._safe_close(1)
|
||||
|
||||
# now we're really going for 100% coverage
|
||||
x = Mock()
|
||||
def fail():
|
||||
raise KeyboardInterrupt()
|
||||
x.close = fail
|
||||
self.assertRaises(KeyboardInterrupt, self.pool._safe_close, x)
|
||||
|
||||
x = Mock()
|
||||
def fail2():
|
||||
raise RuntimeError("if this line has been printed, the test succeeded")
|
||||
x.close = fail2
|
||||
self.pool._safe_close(x, quiet=False)
|
||||
|
||||
def test_zero_max_idle(self):
|
||||
self.pool = self.create_pool(max_size=2, max_idle=0)
|
||||
self.connection = self.pool.get()
|
||||
self.connection.close()
|
||||
self.assertEquals(len(self.pool.free_items), 0)
|
||||
|
||||
def test_zero_max_age(self):
|
||||
self.pool = self.create_pool(max_size=2, max_age=0)
|
||||
self.connection = self.pool.get()
|
||||
self.connection.close()
|
||||
self.assertEquals(len(self.pool.free_items), 0)
|
||||
|
||||
def test_max_idle(self):
|
||||
# This test is timing-sensitive.
|
||||
self.pool = self.create_pool(max_size=2, max_idle=0.02)
|
||||
self.connection = self.pool.get()
|
||||
self.connection.close()
|
||||
self.assertEquals(len(self.pool.free_items), 1)
|
||||
api.sleep(0.01) # not long enough to trigger the idle timeout
|
||||
self.assertEquals(len(self.pool.free_items), 1)
|
||||
self.connection = self.pool.get()
|
||||
self.connection.close()
|
||||
self.assertEquals(len(self.pool.free_items), 1)
|
||||
api.sleep(0.01) # idle timeout should have fired but done nothing
|
||||
self.assertEquals(len(self.pool.free_items), 1)
|
||||
self.connection = self.pool.get()
|
||||
self.connection.close()
|
||||
self.assertEquals(len(self.pool.free_items), 1)
|
||||
api.sleep(0.03) # long enough to trigger idle timeout for real
|
||||
self.assertEquals(len(self.pool.free_items), 0)
|
||||
|
||||
def test_max_idle_many(self):
|
||||
# This test is timing-sensitive.
|
||||
self.pool = self.create_pool(max_size=2, max_idle=0.02)
|
||||
self.connection, conn2 = self.pool.get(), self.pool.get()
|
||||
self.connection.close()
|
||||
api.sleep(0.01)
|
||||
self.assertEquals(len(self.pool.free_items), 1)
|
||||
conn2.close()
|
||||
self.assertEquals(len(self.pool.free_items), 2)
|
||||
api.sleep(0.02) # trigger cleanup of conn1 but not conn2
|
||||
self.assertEquals(len(self.pool.free_items), 1)
|
||||
|
||||
def test_max_age(self):
|
||||
# This test is timing-sensitive.
|
||||
self.pool = self.create_pool(max_size=2, max_age=0.05)
|
||||
self.connection = self.pool.get()
|
||||
self.connection.close()
|
||||
self.assertEquals(len(self.pool.free_items), 1)
|
||||
api.sleep(0.01) # not long enough to trigger the age timeout
|
||||
self.assertEquals(len(self.pool.free_items), 1)
|
||||
self.connection = self.pool.get()
|
||||
self.connection.close()
|
||||
self.assertEquals(len(self.pool.free_items), 1)
|
||||
api.sleep(0.05) # long enough to trigger age timeout
|
||||
self.assertEquals(len(self.pool.free_items), 0)
|
||||
|
||||
def test_max_age_many(self):
|
||||
# This test is timing-sensitive.
|
||||
self.pool = self.create_pool(max_size=2, max_age=0.15)
|
||||
self.connection, conn2 = self.pool.get(), self.pool.get()
|
||||
self.connection.close()
|
||||
self.assertEquals(len(self.pool.free_items), 1)
|
||||
api.sleep(0) # not long enough to trigger the age timeout
|
||||
self.assertEquals(len(self.pool.free_items), 1)
|
||||
api.sleep(0.2) # long enough to trigger age timeout
|
||||
self.assertEquals(len(self.pool.free_items), 0)
|
||||
conn2.close() # should not be added to the free items
|
||||
self.assertEquals(len(self.pool.free_items), 0)
|
||||
|
||||
def test_connection_timeout(self):
|
||||
# use a nonexistent ip address -- this one is reserved by IANA
|
||||
self._auth['host'] = '192.0.2.1'
|
||||
pool = self.create_pool()
|
||||
self.assertRaises(db_pool.ConnectTimeout, pool.get)
|
||||
|
||||
def test_waiters_get_woken(self):
|
||||
# verify that when there's someone waiting on an empty pool
|
||||
# and someone puts an immediately-closed connection back in
|
||||
# the pool that the waiter gets woken
|
||||
self.pool = self.create_pool(max_size=1, max_age=0)
|
||||
|
||||
conn = self.pool.get()
|
||||
self.assertEquals(self.pool.free(), 0)
|
||||
self.assertEquals(self.pool.waiting(), 0)
|
||||
e = coros.event()
|
||||
def retrieve(pool, ev):
|
||||
c = pool.get()
|
||||
ev.send(c)
|
||||
api.spawn(retrieve, self.pool, e)
|
||||
api.sleep(0) # these two sleeps should advance the retrieve
|
||||
api.sleep(0) # coroutine until it's waiting in get()
|
||||
self.assertEquals(self.pool.free(), 0)
|
||||
self.assertEquals(self.pool.waiting(), 1)
|
||||
self.pool.put(conn)
|
||||
timer = api.exc_after(0.3, api.TimeoutError)
|
||||
conn = e.wait()
|
||||
timer.cancel()
|
||||
self.assertEquals(self.pool.free(), 0)
|
||||
self.assertEquals(self.pool.waiting(), 0)
|
||||
|
||||
def dont_test_0_straight_benchmark(self):
|
||||
""" Benchmark; don't run unless you want to wait a while."""
|
||||
import time
|
||||
iterations = 20000
|
||||
c = self.connection.cursor()
|
||||
self.connection.commit()
|
||||
def bench(c):
|
||||
for i in xrange(iterations):
|
||||
c.execute('select 1')
|
||||
|
||||
bench(c) # warm-up
|
||||
results = []
|
||||
for i in xrange(3):
|
||||
start = time.time()
|
||||
bench(c)
|
||||
end = time.time()
|
||||
results.append(end-start)
|
||||
|
||||
print "\n%u iterations took an average of %f seconds, (%s) in %s\n" % (
|
||||
iterations, sum(results)/len(results), results, type(self))
|
||||
|
||||
def test_raising_create(self):
|
||||
# if the create() method raises an exception the pool should
|
||||
# not lose any connections
|
||||
self.pool = self.create_pool(max_size=1, module=RaisingDBModule())
|
||||
self.assertRaises(RuntimeError, self.pool.get)
|
||||
self.assertEquals(self.pool.free(), 1)
|
||||
|
||||
|
||||
class RaisingDBModule(object):
|
||||
def connect(self, *args, **kw):
|
||||
raise RuntimeError()
|
||||
|
||||
|
||||
class TestTpoolConnectionPool(TestDBConnectionPool):
|
||||
def create_pool(self, max_items = 1):
|
||||
return db_pool.TpooledConnectionPool(self._dbmodule, 0, max_items, **self._auth)
|
||||
def create_pool(self, max_size = 1, max_idle = 10, max_age = 10, connect_timeout=0.5, module=None):
|
||||
if module is None:
|
||||
module = self._dbmodule
|
||||
return db_pool.TpooledConnectionPool(module,
|
||||
min_size=0, max_size=max_size,
|
||||
max_idle=max_idle, max_age=max_age,
|
||||
connect_timeout = connect_timeout,
|
||||
**self._auth)
|
||||
|
||||
|
||||
class TestSaranwrapConnectionPool(TestDBConnectionPool):
|
||||
def create_pool(self, max_items = 1):
|
||||
return db_pool.SaranwrappedConnectionPool(self._dbmodule, 0, max_items, **self._auth)
|
||||
def create_pool(self, max_size = 1, max_idle = 10, max_age = 10, connect_timeout= 0.5, module=None):
|
||||
if module is None:
|
||||
module = self._dbmodule
|
||||
return db_pool.SaranwrappedConnectionPool(module,
|
||||
min_size=0, max_size=max_size,
|
||||
max_idle=max_idle, max_age=max_age,
|
||||
connect_timeout=connect_timeout,
|
||||
**self._auth)
|
||||
|
||||
def test_raising_create(self):
|
||||
# *TODO: this fails because of saranwrap's unwillingness to
|
||||
# wrap objects in tests, but it should be fixable
|
||||
pass
|
||||
|
||||
|
||||
class TestRawConnectionPool(TestDBConnectionPool):
|
||||
def create_pool(self, max_size = 1, max_idle = 10, max_age = 10, connect_timeout= 0.5, module=None):
|
||||
if module is None:
|
||||
module = self._dbmodule
|
||||
return db_pool.RawConnectionPool(module,
|
||||
min_size=0, max_size=max_size,
|
||||
max_idle=max_idle, max_age=max_age,
|
||||
connect_timeout=connect_timeout,
|
||||
**self._auth)
|
||||
|
||||
def test_connection_timeout(self):
|
||||
pass # not gonna work for raw connections because they're not nonblocking
|
||||
|
||||
|
||||
class TestMysqlConnectionPool(object):
|
||||
def setUp(self):
|
||||
@@ -273,6 +491,9 @@ class TestMysqlConnectionPool(object):
|
||||
except (IOError, ImportError), e:
|
||||
self._auth = {'host': 'localhost','user': 'root','passwd': '','db': 'persist0'}
|
||||
super(TestMysqlConnectionPool, self).setUp()
|
||||
|
||||
def tearDown(self):
|
||||
pass
|
||||
|
||||
def create_db(self):
|
||||
auth = self._auth.copy()
|
||||
@@ -292,10 +513,15 @@ class TestMysqlConnectionPool(object):
|
||||
db.close()
|
||||
del db
|
||||
|
||||
class TestMysqlTpool(TestMysqlConnectionPool, TestTpoolConnectionPool, tests.TestCase):
|
||||
|
||||
# for some reason the tpool test hangs if run after the saranwrap test
|
||||
class Test01MysqlTpool(TestMysqlConnectionPool, TestTpoolConnectionPool, tests.TestCase):
|
||||
pass
|
||||
|
||||
class TestMysqlSaranwrap(TestMysqlConnectionPool, TestSaranwrapConnectionPool, tests.TestCase):
|
||||
class Test02MysqlSaranwrap(TestMysqlConnectionPool, TestSaranwrapConnectionPool, tests.TestCase):
|
||||
pass
|
||||
|
||||
class Test03MysqlRaw(TestMysqlConnectionPool, TestRawConnectionPool, tests.TestCase):
|
||||
pass
|
||||
|
||||
|
||||
|
Reference in New Issue
Block a user