
If it's determined that a MySQL test should be skipped during setUp(), but after the parent setUp() has been called, having created and started a timer, then tearDown() is never called and the timer never cancelled, but left to expire during some future, unrelated test. Move call to parent setUp() to end of child setUp() to insure that the timer is only created and started after it's been determined that the test shouldn't be skipped (ie create_db()).
245 lines
7.5 KiB
Python
245 lines
7.5 KiB
Python
from __future__ import print_function
|
|
|
|
import os
|
|
import time
|
|
import traceback
|
|
|
|
import eventlet
|
|
from eventlet import event
|
|
from tests import (
|
|
LimitedTestCase,
|
|
run_python,
|
|
skip_unless, using_pyevent, get_database_auth,
|
|
)
|
|
try:
|
|
from eventlet.green import MySQLdb
|
|
except ImportError:
|
|
MySQLdb = False
|
|
|
|
|
|
def mysql_requirement(_f):
|
|
"""We want to skip tests if using pyevent, MySQLdb is not installed, or if
|
|
there is no database running on the localhost that the auth file grants
|
|
us access to.
|
|
|
|
This errs on the side of skipping tests if everything is not right, but
|
|
it's better than a million tests failing when you don't care about mysql
|
|
support."""
|
|
if using_pyevent(_f):
|
|
return False
|
|
if MySQLdb is False:
|
|
print("Skipping mysql tests, MySQLdb not importable")
|
|
return False
|
|
try:
|
|
auth = get_database_auth()['MySQLdb'].copy()
|
|
MySQLdb.connect(**auth)
|
|
return True
|
|
except MySQLdb.OperationalError:
|
|
print("Skipping mysql tests, error when connecting:")
|
|
traceback.print_exc()
|
|
return False
|
|
|
|
|
|
class TestMySQLdb(LimitedTestCase):
|
|
def setUp(self):
|
|
self._auth = get_database_auth()['MySQLdb']
|
|
self.create_db()
|
|
self.connection = None
|
|
self.connection = MySQLdb.connect(**self._auth)
|
|
cursor = self.connection.cursor()
|
|
cursor.execute("""CREATE TABLE gargleblatz
|
|
(
|
|
a INTEGER
|
|
);""")
|
|
self.connection.commit()
|
|
cursor.close()
|
|
|
|
super(TestMySQLdb, self).setUp()
|
|
|
|
def tearDown(self):
|
|
if self.connection:
|
|
self.connection.close()
|
|
self.drop_db()
|
|
|
|
super(TestMySQLdb, self).tearDown()
|
|
|
|
@skip_unless(mysql_requirement)
|
|
def create_db(self):
|
|
auth = self._auth.copy()
|
|
try:
|
|
self.drop_db()
|
|
except Exception:
|
|
pass
|
|
dbname = 'test_%d_%d' % (os.getpid(), int(time.time() * 1000))
|
|
db = MySQLdb.connect(**auth).cursor()
|
|
db.execute("create database " + dbname)
|
|
db.close()
|
|
self._auth['db'] = dbname
|
|
del db
|
|
|
|
def drop_db(self):
|
|
db = MySQLdb.connect(**self._auth).cursor()
|
|
db.execute("drop database " + self._auth['db'])
|
|
db.close()
|
|
del db
|
|
|
|
def set_up_dummy_table(self, connection=None):
|
|
close_connection = False
|
|
if connection is None:
|
|
close_connection = True
|
|
if self.connection is None:
|
|
connection = MySQLdb.connect(**self._auth)
|
|
else:
|
|
connection = self.connection
|
|
|
|
cursor = connection.cursor()
|
|
cursor.execute(self.dummy_table_sql)
|
|
connection.commit()
|
|
cursor.close()
|
|
if close_connection:
|
|
connection.close()
|
|
|
|
dummy_table_sql = """CREATE TEMPORARY TABLE test_table
|
|
(
|
|
row_id INTEGER PRIMARY KEY AUTO_INCREMENT,
|
|
value_int INTEGER,
|
|
value_float FLOAT,
|
|
value_string VARCHAR(200),
|
|
value_uuid CHAR(36),
|
|
value_binary BLOB,
|
|
value_binary_string VARCHAR(200) BINARY,
|
|
value_enum ENUM('Y','N'),
|
|
created TIMESTAMP
|
|
) ENGINE=InnoDB;"""
|
|
|
|
def assert_cursor_yields(self, curs):
|
|
counter = [0]
|
|
|
|
def tick():
|
|
while True:
|
|
counter[0] += 1
|
|
eventlet.sleep()
|
|
gt = eventlet.spawn(tick)
|
|
curs.execute("select 1")
|
|
rows = curs.fetchall()
|
|
self.assertEqual(len(rows), 1)
|
|
self.assertEqual(len(rows[0]), 1)
|
|
self.assertEqual(rows[0][0], 1)
|
|
assert counter[0] > 0, counter[0]
|
|
gt.kill()
|
|
|
|
def assert_cursor_works(self, cursor):
|
|
cursor.execute("select 1")
|
|
rows = cursor.fetchall()
|
|
self.assertEqual(len(rows), 1)
|
|
self.assertEqual(len(rows[0]), 1)
|
|
self.assertEqual(rows[0][0], 1)
|
|
self.assert_cursor_yields(cursor)
|
|
|
|
def assert_connection_works(self, conn):
|
|
curs = conn.cursor()
|
|
self.assert_cursor_works(curs)
|
|
|
|
def test_module_attributes(self):
|
|
import MySQLdb as orig
|
|
for key in dir(orig):
|
|
if key not in ('__author__', '__path__', '__revision__',
|
|
'__version__', '__loader__'):
|
|
assert hasattr(MySQLdb, key), "%s %s" % (key, getattr(orig, key))
|
|
|
|
def test_connecting(self):
|
|
assert self.connection is not None
|
|
|
|
def test_connecting_annoyingly(self):
|
|
self.assert_connection_works(MySQLdb.Connect(**self._auth))
|
|
self.assert_connection_works(MySQLdb.Connection(**self._auth))
|
|
self.assert_connection_works(MySQLdb.connections.Connection(**self._auth))
|
|
|
|
def test_create_cursor(self):
|
|
cursor = self.connection.cursor()
|
|
cursor.close()
|
|
|
|
def test_run_query(self):
|
|
cursor = self.connection.cursor()
|
|
self.assert_cursor_works(cursor)
|
|
cursor.close()
|
|
|
|
def test_run_bad_query(self):
|
|
cursor = self.connection.cursor()
|
|
try:
|
|
cursor.execute("garbage blah blah")
|
|
assert False
|
|
except AssertionError:
|
|
raise
|
|
except Exception:
|
|
pass
|
|
cursor.close()
|
|
|
|
def fill_up_table(self, conn):
|
|
curs = conn.cursor()
|
|
for i in range(1000):
|
|
curs.execute('insert into test_table (value_int) values (%s)' % i)
|
|
conn.commit()
|
|
|
|
def test_yields(self):
|
|
conn = self.connection
|
|
self.set_up_dummy_table(conn)
|
|
self.fill_up_table(conn)
|
|
curs = conn.cursor()
|
|
results = []
|
|
SHORT_QUERY = "select * from test_table"
|
|
evt = event.Event()
|
|
|
|
def a_query():
|
|
self.assert_cursor_works(curs)
|
|
curs.execute(SHORT_QUERY)
|
|
results.append(2)
|
|
evt.send()
|
|
eventlet.spawn(a_query)
|
|
results.append(1)
|
|
self.assertEqual([1], results)
|
|
evt.wait()
|
|
self.assertEqual([1, 2], results)
|
|
|
|
def test_visibility_from_other_connections(self):
|
|
conn = MySQLdb.connect(**self._auth)
|
|
conn2 = MySQLdb.connect(**self._auth)
|
|
curs = conn.cursor()
|
|
try:
|
|
curs2 = conn2.cursor()
|
|
curs2.execute("insert into gargleblatz (a) values (%s)" % (314159))
|
|
self.assertEqual(curs2.rowcount, 1)
|
|
conn2.commit()
|
|
selection_query = "select * from gargleblatz"
|
|
curs2.execute(selection_query)
|
|
self.assertEqual(curs2.rowcount, 1)
|
|
del curs2, conn2
|
|
# create a new connection, it should see the addition
|
|
conn3 = MySQLdb.connect(**self._auth)
|
|
curs3 = conn3.cursor()
|
|
curs3.execute(selection_query)
|
|
self.assertEqual(curs3.rowcount, 1)
|
|
# now, does the already-open connection see it?
|
|
curs.execute(selection_query)
|
|
self.assertEqual(curs.rowcount, 1)
|
|
del curs3, conn3
|
|
finally:
|
|
# clean up my litter
|
|
curs.execute("delete from gargleblatz where a=314159")
|
|
conn.commit()
|
|
|
|
|
|
class TestMonkeyPatch(LimitedTestCase):
|
|
@skip_unless(mysql_requirement)
|
|
def test_monkey_patching(self):
|
|
testcode_path = os.path.join(
|
|
os.path.dirname(os.path.abspath(__file__)),
|
|
'mysqldb_test_monkey_patch.py',
|
|
)
|
|
output = run_python(testcode_path)
|
|
lines = output.splitlines()
|
|
self.assertEqual(len(lines), 2, output)
|
|
self.assertEqual(lines[0].replace("psycopg,", ""),
|
|
'mysqltest MySQLdb,os,select,socket,thread,time')
|
|
self.assertEqual(lines[1], "connect True")
|