245 lines
7.5 KiB
Python
245 lines
7.5 KiB
Python
from __future__ import print_function
|
|
|
|
import os
|
|
import time
|
|
import traceback
|
|
|
|
import eventlet
|
|
from eventlet import event
|
|
from tests import (
|
|
LimitedTestCase,
|
|
run_python,
|
|
skip_unless, using_pyevent, get_database_auth,
|
|
)
|
|
try:
|
|
from eventlet.green import MySQLdb
|
|
except ImportError:
|
|
MySQLdb = False
|
|
|
|
|
|
def mysql_requirement(_f):
|
|
"""We want to skip tests if using pyevent, MySQLdb is not installed, or if
|
|
there is no database running on the localhost that the auth file grants
|
|
us access to.
|
|
|
|
This errs on the side of skipping tests if everything is not right, but
|
|
it's better than a million tests failing when you don't care about mysql
|
|
support."""
|
|
if using_pyevent(_f):
|
|
return False
|
|
if MySQLdb is False:
|
|
print("Skipping mysql tests, MySQLdb not importable")
|
|
return False
|
|
try:
|
|
auth = get_database_auth()['MySQLdb'].copy()
|
|
MySQLdb.connect(**auth)
|
|
return True
|
|
except MySQLdb.OperationalError:
|
|
print("Skipping mysql tests, error when connecting:")
|
|
traceback.print_exc()
|
|
return False
|
|
|
|
|
|
class TestMySQLdb(LimitedTestCase):
|
|
def setUp(self):
|
|
super(TestMySQLdb, self).setUp()
|
|
|
|
self._auth = get_database_auth()['MySQLdb']
|
|
self.create_db()
|
|
self.connection = None
|
|
self.connection = MySQLdb.connect(**self._auth)
|
|
cursor = self.connection.cursor()
|
|
cursor.execute("""CREATE TABLE gargleblatz
|
|
(
|
|
a INTEGER
|
|
);""")
|
|
self.connection.commit()
|
|
cursor.close()
|
|
|
|
def tearDown(self):
|
|
if self.connection:
|
|
self.connection.close()
|
|
self.drop_db()
|
|
|
|
super(TestMySQLdb, self).tearDown()
|
|
|
|
@skip_unless(mysql_requirement)
|
|
def create_db(self):
|
|
auth = self._auth.copy()
|
|
try:
|
|
self.drop_db()
|
|
except Exception:
|
|
pass
|
|
dbname = 'test_%d_%d' % (os.getpid(), int(time.time() * 1000))
|
|
db = MySQLdb.connect(**auth).cursor()
|
|
db.execute("create database " + dbname)
|
|
db.close()
|
|
self._auth['db'] = dbname
|
|
del db
|
|
|
|
def drop_db(self):
|
|
db = MySQLdb.connect(**self._auth).cursor()
|
|
db.execute("drop database " + self._auth['db'])
|
|
db.close()
|
|
del db
|
|
|
|
def set_up_dummy_table(self, connection=None):
|
|
close_connection = False
|
|
if connection is None:
|
|
close_connection = True
|
|
if self.connection is None:
|
|
connection = MySQLdb.connect(**self._auth)
|
|
else:
|
|
connection = self.connection
|
|
|
|
cursor = connection.cursor()
|
|
cursor.execute(self.dummy_table_sql)
|
|
connection.commit()
|
|
cursor.close()
|
|
if close_connection:
|
|
connection.close()
|
|
|
|
dummy_table_sql = """CREATE TEMPORARY TABLE test_table
|
|
(
|
|
row_id INTEGER PRIMARY KEY AUTO_INCREMENT,
|
|
value_int INTEGER,
|
|
value_float FLOAT,
|
|
value_string VARCHAR(200),
|
|
value_uuid CHAR(36),
|
|
value_binary BLOB,
|
|
value_binary_string VARCHAR(200) BINARY,
|
|
value_enum ENUM('Y','N'),
|
|
created TIMESTAMP
|
|
) ENGINE=InnoDB;"""
|
|
|
|
def assert_cursor_yields(self, curs):
|
|
counter = [0]
|
|
|
|
def tick():
|
|
while True:
|
|
counter[0] += 1
|
|
eventlet.sleep()
|
|
gt = eventlet.spawn(tick)
|
|
curs.execute("select 1")
|
|
rows = curs.fetchall()
|
|
self.assertEqual(len(rows), 1)
|
|
self.assertEqual(len(rows[0]), 1)
|
|
self.assertEqual(rows[0][0], 1)
|
|
assert counter[0] > 0, counter[0]
|
|
gt.kill()
|
|
|
|
def assert_cursor_works(self, cursor):
|
|
cursor.execute("select 1")
|
|
rows = cursor.fetchall()
|
|
self.assertEqual(len(rows), 1)
|
|
self.assertEqual(len(rows[0]), 1)
|
|
self.assertEqual(rows[0][0], 1)
|
|
self.assert_cursor_yields(cursor)
|
|
|
|
def assert_connection_works(self, conn):
|
|
curs = conn.cursor()
|
|
self.assert_cursor_works(curs)
|
|
|
|
def test_module_attributes(self):
|
|
import MySQLdb as orig
|
|
for key in dir(orig):
|
|
if key not in ('__author__', '__path__', '__revision__',
|
|
'__version__', '__loader__'):
|
|
assert hasattr(MySQLdb, key), "%s %s" % (key, getattr(orig, key))
|
|
|
|
def test_connecting(self):
|
|
assert self.connection is not None
|
|
|
|
def test_connecting_annoyingly(self):
|
|
self.assert_connection_works(MySQLdb.Connect(**self._auth))
|
|
self.assert_connection_works(MySQLdb.Connection(**self._auth))
|
|
self.assert_connection_works(MySQLdb.connections.Connection(**self._auth))
|
|
|
|
def test_create_cursor(self):
|
|
cursor = self.connection.cursor()
|
|
cursor.close()
|
|
|
|
def test_run_query(self):
|
|
cursor = self.connection.cursor()
|
|
self.assert_cursor_works(cursor)
|
|
cursor.close()
|
|
|
|
def test_run_bad_query(self):
|
|
cursor = self.connection.cursor()
|
|
try:
|
|
cursor.execute("garbage blah blah")
|
|
assert False
|
|
except AssertionError:
|
|
raise
|
|
except Exception:
|
|
pass
|
|
cursor.close()
|
|
|
|
def fill_up_table(self, conn):
|
|
curs = conn.cursor()
|
|
for i in range(1000):
|
|
curs.execute('insert into test_table (value_int) values (%s)' % i)
|
|
conn.commit()
|
|
|
|
def test_yields(self):
|
|
conn = self.connection
|
|
self.set_up_dummy_table(conn)
|
|
self.fill_up_table(conn)
|
|
curs = conn.cursor()
|
|
results = []
|
|
SHORT_QUERY = "select * from test_table"
|
|
evt = event.Event()
|
|
|
|
def a_query():
|
|
self.assert_cursor_works(curs)
|
|
curs.execute(SHORT_QUERY)
|
|
results.append(2)
|
|
evt.send()
|
|
eventlet.spawn(a_query)
|
|
results.append(1)
|
|
self.assertEqual([1], results)
|
|
evt.wait()
|
|
self.assertEqual([1, 2], results)
|
|
|
|
def test_visibility_from_other_connections(self):
|
|
conn = MySQLdb.connect(**self._auth)
|
|
conn2 = MySQLdb.connect(**self._auth)
|
|
curs = conn.cursor()
|
|
try:
|
|
curs2 = conn2.cursor()
|
|
curs2.execute("insert into gargleblatz (a) values (%s)" % (314159))
|
|
self.assertEqual(curs2.rowcount, 1)
|
|
conn2.commit()
|
|
selection_query = "select * from gargleblatz"
|
|
curs2.execute(selection_query)
|
|
self.assertEqual(curs2.rowcount, 1)
|
|
del curs2, conn2
|
|
# create a new connection, it should see the addition
|
|
conn3 = MySQLdb.connect(**self._auth)
|
|
curs3 = conn3.cursor()
|
|
curs3.execute(selection_query)
|
|
self.assertEqual(curs3.rowcount, 1)
|
|
# now, does the already-open connection see it?
|
|
curs.execute(selection_query)
|
|
self.assertEqual(curs.rowcount, 1)
|
|
del curs3, conn3
|
|
finally:
|
|
# clean up my litter
|
|
curs.execute("delete from gargleblatz where a=314159")
|
|
conn.commit()
|
|
|
|
|
|
class TestMonkeyPatch(LimitedTestCase):
|
|
@skip_unless(mysql_requirement)
|
|
def test_monkey_patching(self):
|
|
testcode_path = os.path.join(
|
|
os.path.dirname(os.path.abspath(__file__)),
|
|
'mysqldb_test_monkey_patch.py',
|
|
)
|
|
output = run_python(testcode_path)
|
|
lines = output.splitlines()
|
|
self.assertEqual(len(lines), 2, output)
|
|
self.assertEqual(lines[0].replace("psycopg,", ""),
|
|
'mysqltest MySQLdb,os,select,socket,thread,time')
|
|
self.assertEqual(lines[1], "connect True")
|