tests: upgrade plugin tests to new class. Test against mysql-5.6 too

This commit is contained in:
Daniel Black
2015-08-25 19:58:35 +10:00
parent 0a1763fb39
commit 4cc3ca7ed2
4 changed files with 203 additions and 107 deletions

View File

@@ -1,4 +1,4 @@
[
{"host": "localhost", "user": "root", "passwd": "", "db": "test_pymysql", "use_unicode": true, "local_infile": true},
{"host": "localhost", "user": "root", "passwd": "", "db": "test_pymysql2" }
{"host": "localhost", "unix_socket": "/var/run/mysqld/mysqld.sock", "user": "root", "passwd": "", "db": "test_pymysql", "use_unicode": true, "local_infile": true},
{"host": "localhost", "port": 3306, "user": "root", "passwd": "", "db": "test_pymysql2" }
]

View File

@@ -1,6 +1,10 @@
sudo: false
language: python
python: "3.4"
cache:
directories:
- mysql
env:
matrix:
- TOX_ENV=py26
@@ -21,14 +25,48 @@ matrix:
- addons:
mariadb: 10.1
env: TOX_ENV=py34
- env:
- TOX_ENV=py34
- DB=5.6.26
addons:
apt:
packages:
- libaio-dev
# really only need libaio1 however libaio-dev is whitelisted
#
# http://dev.mysql.com/downloads/mysql/5.7.html
# - env:
# - TOX_ENV=py34
# - DB=5.7.8-rc
before_install:
install:
- pip install -U tox
before_script:
- if [ ! -z "${DB}" ]; then
F=mysql-${DB}-linux-glibc2.5-x86_64;
mkdir -p ${HOME}/mysql;
P=${HOME}/mysql/${F} ;
if [ ! -d "${P}" ]; then
wget http://cdn.mysql.com/Downloads/MySQL-${DB%.*}/${F}.tar.gz -O - | tar -zxf - --directory=${HOME}/mysql ;
fi;
${P}/scripts/mysql_install_db --defaults-file=${P}/my.cnf --basedir=${P} --datadir=${HOME}/db-"${DB}" --log-error=/tmp/mysql.err;
${P}/bin/mysqld_safe --defaults-file=${P}/my.cnf --ledir=/ --mysqld=${P}/bin/mysqld --datadir=${HOME}/db-${DB} --socket=/tmp/mysql.sock --port 3307 --innodb-buffer-pool-size=200M --lc-messages-dir=${P}/share --plugin-dir=${P}/lib/plugin/ --log-error=/tmp/mysql.err &
sleep 5; cat /tmp/mysql.err; df -h;
mysql -S /tmp/mysql.sock "create user ${USER}@localhost; create user ${USER}@'%'; grant all on *.* to ${USER}@localhost WITH GRANT OPTION;grant all on *.* to ${USER}@'%' WITH GRANT OPTION;";
echo 'check we are talking about the right sed' 1>&2 ;
sed -e 's/3306/3307/g' -e 's:/var/run/mysqld/mysqld.sock:/tmp/mysql.sock:g' .travis.databases.json > pymysql/tests/databases.json;
echo -e '[client]\nsocket = /tmp/mysql.sock\n' > "${HOME}"/.my.cnf ;
else
cp .travis.databases.json pymysql/tests/databases.json;
fi
- "mysql -e 'create database test_pymysql DEFAULT CHARACTER SET utf8 DEFAULT COLLATE utf8_general_ci;'"
- "mysql -e 'create database test_pymysql2 DEFAULT CHARACTER SET utf8 DEFAULT COLLATE utf8_general_ci;'"
- "mysql -e 'select VERSION();'"
- cp .travis.databases.json pymysql/tests/databases.json
script: tox -e $TOX_ENV
script:
- tox -e $TOX_ENV
after_failure:
- cat /tmp/mysql.err

View File

@@ -2,11 +2,170 @@ import datetime
import decimal
import pymysql
import time
import os
import unittest2
from pymysql.tests import base
class TempUser:
def __init__(self, c, user, db, auth, authdata=None):
self._c = c
self._user = user
self._db = db
create = "CREATE USER %s" \
" IDENTIFIED WITH %s" % (user, auth)
if authdata is not None:
create += " AS '%s'" % authdata
try:
c.execute(create)
self._created = True
except pymysql.err.InternalError:
# already exists - TODO need to check the same plugin applies
self._created = False
try:
c.execute("GRANT SELECT ON %s.* TO %s" % (db, user))
self._grant = True
except pymysql.err.InternalError:
self._grant = False
def __enter__(self):
return self
def __exit__(self, exc_type, exc_value, traceback):
if self._grant:
self._c.execute("REVOKE SELECT ON %s.* FROM %s" % (self._db, self._user))
if self._created:
self._c.execute("DROP USER %s" % self._user)
class TestAuthentication(base.PyMySQLTestCase):
socket_auth = False
socket_found = False
two_questions_found = False
three_attempts_found = False
pam_found = False
import os
osuser = os.environ.get('USER')
# socket auth requires the current user and for the connection to be a socket
# rest do grants @localhost due to incomplete logic - TODO change to @% then
db = base.PyMySQLTestCase.databases[0].copy()
socket_auth = db.get('unix_socket') is not None \
and db.get('host') in ('localhost', '127.0.0.1')
cur = pymysql.connect(**db).cursor()
del db['user']
cur.execute("SHOW PLUGINS")
for r in cur:
if (r[1], r[2], r[3]) == (u'ACTIVE', u'AUTHENTICATION', u'auth_socket.so'):
socket_plugin_name = r[0]
socket_found = True
if (r[1], r[2], r[3]) == (u'ACTIVE', u'AUTHENTICATION', u'dialog_examples.so'):
if r[0] == 'two_questions':
two_questions_found = True
elif r[0] == 'three_attempts':
three_attempts_found = True
if (r[0], r[1], r[2]) == (u'pam', u'ACTIVE', u'AUTHENTICATION'):
pam_found = True
pam_plugin_name = r[3].split('.')[0]
if pam_plugin_name == 'auth_pam':
pam_plugin_name = 'pam'
# MySQL: authentication_pam
# https://dev.mysql.com/doc/refman/5.5/en/pam-authentication-plugin.html
# MariaDB: pam
# https://mariadb.com/kb/en/mariadb/pam-authentication-plugin/
# Names differ but functionality is close
def test_plugin(self):
# Bit of an assumption that the current user is a native password
self.assertEqual('mysql_native_password', self.connections[0].get_plugin_name())
@unittest2.skipUnless(socket_auth, "connection to unix_socket required")
@unittest2.skipIf(socket_found, "socket plugin already installed")
def testSocketAuthInstallPlugin(self):
# needs plugin. lets install it.
cur = self.connections[0].cursor()
try:
cur.execute("install plugin auth_socket soname 'auth_socket.so'")
TestAuthentication.socket_found = True
self.socket_plugin_name = 'auth_socket'
self.realtestSocketAuth()
except pymysql.err.InternalError:
try:
cur.execute("install soname 'auth_socket'")
TestAuthentication.socket_found = True
self.socket_plugin_name = 'unix_socket'
self.realtestSocketAuth()
except pymysql.err.InternalError:
TestAuthentication.socket_found = False
raise unittest2.SkipTest('we couldn\'t install the socket plugin')
finally:
if TestAuthentication.socket_found:
cur = self.connections[0].cursor()
cur.execute("uninstall plugin %s" % self.socket_plugin_name)
@unittest2.skipUnless(socket_auth, "connection to unix_socket required")
@unittest2.skipUnless(socket_found, "no socket plugin")
def testSocketAuth(self):
self.realtestSocketAuth()
def realtestSocketAuth(self):
with TempUser(self.connections[0].cursor(), TestAuthentication.osuser + '@localhost',
self.databases[0]['db'], self.socket_plugin_name) as u:
c = pymysql.connect(user=TestAuthentication.osuser, **self.db)
class Dialog(object):
fail=False
def __init__(self, con):
self.fail=TestAuthentication.Dialog.fail
pass
def prompt(self, echo, prompt):
if self.fail:
self.fail=False
return 'bad guess at a password'
return self.m.get(prompt)
@unittest2.skipUnless(socket_auth, "connection to unix_socket required")
@unittest2.skipUnless(two_questions_found, "no two questions auth plugin")
def testDialogAuthTwoQuestions(self):
TestAuthentication.Dialog.fail=False
TestAuthentication.Dialog.m = {'Password, please:': b'notverysecret',
'Are you sure ?': b'yes, of course'}
with TempUser(self.connections[0].cursor(), 'pymysql_test_two_questions@localhost',
self.databases[0]['db'], 'two_questions', 'notverysecret') as u:
pymysql.connect(user='pymysql_test_two_questions', plugin_map={b'dialog': TestAuthentication.Dialog}, **self.db)
@unittest2.skipUnless(socket_auth, "connection to unix_socket required")
@unittest2.skipUnless(three_attempts_found, "no three attempts plugin")
def testDialogAuthThreeAttempts(self):
TestAuthentication.Dialog.m = {'Password, please:': b'stillnotverysecret'}
TestAuthentication.Dialog.fail=True # fail just once. We've got three attempts after all
with TempUser(self.connections[0].cursor(), 'pymysql_test_three_attempts@localhost',
self.databases[0]['db'], 'three_attempts', 'stillnotverysecret') as u:
pymysql.connect(user='pymysql_test_three_attempts', plugin_map={b'dialog': TestAuthentication.Dialog}, **self.db)
@unittest2.skipUnless(socket_auth, "connection to unix_socket required")
@unittest2.skipUnless(pam_found, "no pam plugin")
def testPamAuth(self):
db = self.db.copy()
db['password'] = 'bad guess at password'
with TempUser(self.connections[0].cursor(), TestAuthentication.osuser + '@localhost',
self.databases[0]['db'], self.pam_plugin_name) as u:
try:
c = pymysql.connect(user=TestAuthentication.osuser, **db)
except pymysql.OperationalError as e:
self.assertEqual(1045, e.args[0])
return
# else we had 'bad guess at password' work with pam. Well cool
class TestConnection(base.PyMySQLTestCase):
def test_utf8mb4(self):
"""This test requires MySQL >= 5.5"""
arg = self.databases[0].copy()
@@ -75,108 +234,6 @@ class TestConnection(base.PyMySQLTestCase):
self.assertEqual(('foobar',), c.fetchone())
conn.close()
def test_plugin(self):
con = self.connections[0]
self.assertEqual('mysql_native_password', con.get_plugin_name())
# attempt a unix socket test which is included in some versions
# and doesn't require a client side handler
user = os.environ.get('USER')
if not user or self.databases[0]['host'] != 'localhost':
return
cur = con.cursor()
cur.execute("SHOW PLUGINS")
socketfound = False
socket_added = False
two = three = False
pam_plugin = False
for r in cur:
if (r[1], r[2], r[3]) == (u'ACTIVE', u'AUTHENTICATION', u'auth_socket.so'):
socket_plugin_name = r[0]
socketfound = True
if (r[1], r[2], r[3]) == (u'ACTIVE', u'AUTHENTICATION', u'dialog_examples.so'):
if r[0] == 'two_questions':
two=True
elif r[0] == 'three_attempts':
three=True
socketfound = True
if (r[0], r[1], r[2]) == (u'pam', u'ACTIVE', u'AUTHENTICATION'):
pam_plugin = r[3].split('.')[0]
if pam_plugin == 'auth_pam':
pam_plugin = 'pam'
# MySQL: authentication_pam
# https://dev.mysql.com/doc/refman/5.5/en/pam-authentication-plugin.html
# MariaDB: pam
# https://mariadb.com/kb/en/mariadb/pam-authentication-plugin/
# uses client plugin 'dialog' by default however 'mysql_cleartext_password' if
# variable pam-use-cleartext-plugin enabled
if not socketfound:
# needs plugin. lets install it.
try:
cur.execute("install plugin auth_socket soname 'auth_socket.so'")
socket_plugin_name = 'auth_socket'
socket_added = True
except pymysql.err.InternalError:
cur.execute("install soname 'auth_socket'")
socket_plugin_name = 'unix_socket'
socket_added = True
db = self.databases[0].copy()
del db['user']
current_db = db['db']
if socketfound or socket_added:
try:
cur.execute("CREATE USER %s@localhost IDENTIFIED WITH %s" % ( user, socket_plugin_name))
socket_user_added = True
except pymysql.err.InternalError:
# user already exists
socket_user_added = False
cur.execute("GRANT SELECT ON %s.* TO %s@localhost" % ( current_db, user))
c = pymysql.connect(user=user, **db)
if socket_added:
cur.execute("uninstall soname 'auth_socket'")
if socket_user_added:
cur.execute("DROP USER %s@localhost" % user)
class Dialog(object):
m = {'Password, please:': b'notverysecret',
'Are you sure ?': b'yes, of course'}
fail=False
def __init__(self, con):
self.con=con
def prompt(self, echo, prompt):
if self.fail:
self.fail=False
return 'bad guess'
return self.m.get(prompt)
if two:
cur.execute("CREATE USER pymysql_test_two_questions" \
" IDENTIFIED WITH two_questions" \
" AS 'notverysecret'")
cur.execute("GRANT SELECT ON %s.* TO pymysql_test_two_questions" % current_db)
c = pymysql.connect(user='pymysql_test_two_questions', plugin_map={b'dialog': Dialog}, **db)
cur.execute("DROP USER pymysql_test_two_questions")
if three:
Dialog.m = {'Password, please:': b'stillnotverysecret'}
Dialog.fail=True # fail just once. We've got three attempts after all
cur.execute("CREATE USER pymysql_test_three_attempts"
" IDENTIFIED WITH three_attempts" \
" AS 'stillnotverysecret'")
cur.execute("GRANT SELECT ON %s.* TO pymysql_test_three_attempts" % current_db)
c = pymysql.connect(user='pymysql_test_three_attempts', plugin_map={b'dialog': Dialog}, **db)
cur.execute("DROP USER pymysql_test_three_attempts")
if pam_plugin:
cur.execute("CREATE USER %s IDENTIFIED WITH %s" % ( user, pam_plugin))
cur.execute("GRANT ALL ON %s TO %s" % ( current_db, user))
c = pymysql.connect(user=user, **db)
cur.execute("DROP USER %s" % user)
# A custom type and function to escape it
class Foo(object):

View File

@@ -4,3 +4,4 @@ envlist = py26,py27,py33,py34,pypy,pypy3
[testenv]
commands = ./runtests.py
deps = unittest2
passenv = USER