fix the skip print notices in mysql_requirement so they are actually visible on stderr when run from nose (plus some stuff pyflakes recommended and a few cosmetic ws and line-len changes)
This commit is contained in:
@@ -1,11 +1,13 @@
|
||||
"Test cases for db_pool"
|
||||
import sys
|
||||
import os
|
||||
from unittest import TestCase, main
|
||||
|
||||
from tests import skipped, skip_unless, skip_with_pyevent
|
||||
from unittest import TestCase, main
|
||||
from eventlet import event
|
||||
from eventlet import db_pool
|
||||
import eventlet
|
||||
import os
|
||||
|
||||
|
||||
class DBTester(object):
|
||||
__test__ = False # so that nose doesn't try to execute this directly
|
||||
@@ -84,7 +86,7 @@ class DBConnectionPool(DBTester):
|
||||
self.assert_(False)
|
||||
except AssertionError:
|
||||
raise
|
||||
except Exception, e:
|
||||
except Exception:
|
||||
pass
|
||||
cursor.close()
|
||||
|
||||
@@ -144,7 +146,6 @@ class DBConnectionPool(DBTester):
|
||||
curs.execute(SHORT_QUERY)
|
||||
results.append(2)
|
||||
evt.send()
|
||||
evt2 = event.Event()
|
||||
eventlet.spawn(a_query)
|
||||
results.append(1)
|
||||
self.assertEqual([1], results)
|
||||
@@ -299,7 +300,10 @@ class DBConnectionPool(DBTester):
|
||||
|
||||
@skipped
|
||||
def test_max_idle(self):
|
||||
# This test is timing-sensitive. Rename the function without the "dont" to run it, but beware that it could fail or take a while.
|
||||
# This test is timing-sensitive. Rename the function without
|
||||
# the "dont" to run it, but beware that it could fail or take
|
||||
# a while.
|
||||
|
||||
self.pool = self.create_pool(max_size=2, max_idle=0.02)
|
||||
self.connection = self.pool.get()
|
||||
self.connection.close()
|
||||
@@ -319,7 +323,10 @@ class DBConnectionPool(DBTester):
|
||||
|
||||
@skipped
|
||||
def test_max_idle_many(self):
|
||||
# This test is timing-sensitive. Rename the function without the "dont" to run it, but beware that it could fail or take a while.
|
||||
# This test is timing-sensitive. Rename the function without
|
||||
# the "dont" to run it, but beware that it could fail or take
|
||||
# a while.
|
||||
|
||||
self.pool = self.create_pool(max_size=2, max_idle=0.02)
|
||||
self.connection, conn2 = self.pool.get(), self.pool.get()
|
||||
self.connection.close()
|
||||
@@ -332,7 +339,10 @@ class DBConnectionPool(DBTester):
|
||||
|
||||
@skipped
|
||||
def test_max_age(self):
|
||||
# This test is timing-sensitive. Rename the function without the "dont" to run it, but beware that it could fail or take a while.
|
||||
# This test is timing-sensitive. Rename the function without
|
||||
# the "dont" to run it, but beware that it could fail or take
|
||||
# a while.
|
||||
|
||||
self.pool = self.create_pool(max_size=2, max_age=0.05)
|
||||
self.connection = self.pool.get()
|
||||
self.connection.close()
|
||||
@@ -347,7 +357,10 @@ class DBConnectionPool(DBTester):
|
||||
|
||||
@skipped
|
||||
def test_max_age_many(self):
|
||||
# This test is timing-sensitive. Rename the function without the "dont" to run it, but beware that it could fail or take a while.
|
||||
# This test is timing-sensitive. Rename the function without
|
||||
# the "dont" to run it, but beware that it could fail or take
|
||||
# a while.
|
||||
|
||||
self.pool = self.create_pool(max_size=2, max_age=0.15)
|
||||
self.connection, conn2 = self.pool.get(), self.pool.get()
|
||||
self.connection.close()
|
||||
@@ -424,7 +437,8 @@ class RaisingDBModule(object):
|
||||
|
||||
class TpoolConnectionPool(DBConnectionPool):
|
||||
__test__ = False # so that nose doesn't try to execute this directly
|
||||
def create_pool(self, max_size = 1, max_idle = 10, max_age = 10, connect_timeout=0.5, module=None):
|
||||
def create_pool(self, max_size=1, max_idle=10, max_age=10,
|
||||
connect_timeout=0.5, module=None):
|
||||
if module is None:
|
||||
module = self._dbmodule
|
||||
return db_pool.TpooledConnectionPool(module,
|
||||
@@ -447,7 +461,8 @@ class TpoolConnectionPool(DBConnectionPool):
|
||||
|
||||
class RawConnectionPool(DBConnectionPool):
|
||||
__test__ = False # so that nose doesn't try to execute this directly
|
||||
def create_pool(self, max_size = 1, max_idle = 10, max_age = 10, connect_timeout= 0.5, module=None):
|
||||
def create_pool(self, max_size=1, max_idle=10, max_age=10,
|
||||
connect_timeout=0.5, module=None):
|
||||
if module is None:
|
||||
module = self._dbmodule
|
||||
return db_pool.RawConnectionPool(module,
|
||||
@@ -458,8 +473,9 @@ class RawConnectionPool(DBConnectionPool):
|
||||
|
||||
|
||||
def get_auth():
|
||||
"""Looks in the local directory and in the user's home directory for a file named ".test_dbauth",
|
||||
which contains a json map of parameters to the connect function.
|
||||
"""Looks in the local directory and in the user's home directory
|
||||
for a file named ".test_dbauth", which contains a json map of
|
||||
parameters to the connect function.
|
||||
"""
|
||||
files = [os.path.join(os.path.dirname(__file__), '.test_dbauth'),
|
||||
os.path.join(os.path.expanduser('~'), '.test_dbauth')]
|
||||
@@ -473,7 +489,7 @@ def get_auth():
|
||||
return dict([(str(modname), dict([(str(k), str(v))
|
||||
for k, v in connectargs.items()]))
|
||||
for modname, connectargs in auth_utf8.items()])
|
||||
except (IOError, ImportError), e:
|
||||
except (IOError, ImportError):
|
||||
pass
|
||||
return {'MySQLdb':{'host': 'localhost','user': 'root','passwd': ''},
|
||||
'psycopg2':{'user':'test'}}
|
||||
@@ -487,12 +503,12 @@ def mysql_requirement(_f):
|
||||
MySQLdb.connect(**auth)
|
||||
return True
|
||||
except MySQLdb.OperationalError:
|
||||
print "Skipping mysql tests, error when connecting"
|
||||
print >> sys.stderr, ">> Skipping mysql tests, error when connecting:"
|
||||
import traceback
|
||||
traceback.print_exc()
|
||||
return False
|
||||
except ImportError:
|
||||
print "Skipping mysql tests, MySQLdb not importable"
|
||||
print >> sys.stderr, ">> Skipping mysql tests, MySQLdb not importable"
|
||||
return False
|
||||
|
||||
class MysqlConnectionPool(object):
|
||||
@@ -600,7 +616,6 @@ class Psycopg2ConnectionPool(object):
|
||||
|
||||
def drop_db(self):
|
||||
auth = self._auth.copy()
|
||||
dbname = auth.pop('database')
|
||||
conn = self._dbmodule.connect(**auth)
|
||||
conn.set_isolation_level(0)
|
||||
db = conn.cursor()
|
||||
|
Reference in New Issue
Block a user