add PAM plugin to test suite
This commit is contained in:
@@ -3,6 +3,7 @@ branch = True
|
|||||||
source =
|
source =
|
||||||
pymysql
|
pymysql
|
||||||
omit = pymysql/test/*
|
omit = pymysql/test/*
|
||||||
|
pymysql/tests/thirdparty/test_MySQLdb/*
|
||||||
|
|
||||||
|
|
||||||
[report]
|
[report]
|
||||||
|
|||||||
13
.travis.yml
13
.travis.yml
@@ -2,8 +2,7 @@ sudo: false
|
|||||||
language: python
|
language: python
|
||||||
python: "3.4"
|
python: "3.4"
|
||||||
cache:
|
cache:
|
||||||
directories:
|
- pip
|
||||||
- mysql
|
|
||||||
|
|
||||||
env:
|
env:
|
||||||
matrix:
|
matrix:
|
||||||
@@ -27,6 +26,7 @@ matrix:
|
|||||||
env:
|
env:
|
||||||
- TOX_ENV=py33
|
- TOX_ENV=py33
|
||||||
- EXTRAPKG=mariadb-test
|
- EXTRAPKG=mariadb-test
|
||||||
|
- PAMCLEAR=1
|
||||||
sudo: required
|
sudo: required
|
||||||
- addons:
|
- addons:
|
||||||
mariadb: 10.1
|
mariadb: 10.1
|
||||||
@@ -41,6 +41,9 @@ matrix:
|
|||||||
apt:
|
apt:
|
||||||
packages:
|
packages:
|
||||||
- libaio-dev
|
- libaio-dev
|
||||||
|
cache:
|
||||||
|
directories:
|
||||||
|
- mysql
|
||||||
# really only need libaio1 however libaio-dev is whitelisted
|
# really only need libaio1 however libaio-dev is whitelisted
|
||||||
#
|
#
|
||||||
# http://dev.mysql.com/downloads/mysql/5.7.html
|
# http://dev.mysql.com/downloads/mysql/5.7.html
|
||||||
@@ -52,6 +55,10 @@ install:
|
|||||||
- if [ -n "${EXTRAPKG}" ]; then
|
- if [ -n "${EXTRAPKG}" ]; then
|
||||||
sudo apt-get install ${EXTRAPKG};
|
sudo apt-get install ${EXTRAPKG};
|
||||||
fi
|
fi
|
||||||
|
- if [ -n "${PAMCLEAR}" ]; then
|
||||||
|
echo -e '[mysqld]\n\nplugin-load=auth_pam.so\npam-use-cleartext-plugin' | sudo tee -a /etc/mysql/pam-cleartext.cnf;
|
||||||
|
sudo service mysql restart;
|
||||||
|
fi
|
||||||
- pip install -U tox coveralls
|
- pip install -U tox coveralls
|
||||||
|
|
||||||
before_script:
|
before_script:
|
||||||
@@ -80,6 +87,8 @@ before_script:
|
|||||||
- export COVERALLS_PARALLEL=true
|
- export COVERALLS_PARALLEL=true
|
||||||
|
|
||||||
script:
|
script:
|
||||||
|
- export PAMSERVICE=chfn
|
||||||
|
- export PASSWORD=travis
|
||||||
- tox -e $TOX_ENV
|
- tox -e $TOX_ENV
|
||||||
|
|
||||||
after_success:
|
after_success:
|
||||||
|
|||||||
@@ -165,6 +165,9 @@ class TestAuthentication(base.PyMySQLTestCase):
|
|||||||
break
|
break
|
||||||
return pkt
|
return pkt
|
||||||
|
|
||||||
|
class DefectiveHandler(object):
|
||||||
|
def __init__(self, con):
|
||||||
|
self.con=con
|
||||||
|
|
||||||
|
|
||||||
@unittest2.skipUnless(socket_auth, "connection to unix_socket required")
|
@unittest2.skipUnless(socket_auth, "connection to unix_socket required")
|
||||||
@@ -227,11 +230,8 @@ class TestAuthentication(base.PyMySQLTestCase):
|
|||||||
with self.assertRaises(pymysql.err.OperationalError):
|
with self.assertRaises(pymysql.err.OperationalError):
|
||||||
pymysql.connect(user='pymysql_3a', plugin_map={b'dialog': object}, **self.db)
|
pymysql.connect(user='pymysql_3a', plugin_map={b'dialog': object}, **self.db)
|
||||||
|
|
||||||
class DefectiveHandler(object):
|
|
||||||
def __init__(self, con):
|
|
||||||
self.con=con
|
|
||||||
with self.assertRaises(pymysql.err.OperationalError):
|
with self.assertRaises(pymysql.err.OperationalError):
|
||||||
pymysql.connect(user='pymysql_3a', plugin_map={b'dialog': DefectiveHandler}, **self.db)
|
pymysql.connect(user='pymysql_3a', plugin_map={b'dialog': TestAuthentication.DefectiveHandler}, **self.db)
|
||||||
with self.assertRaises(pymysql.err.OperationalError):
|
with self.assertRaises(pymysql.err.OperationalError):
|
||||||
pymysql.connect(user='pymysql_3a', plugin_map={b'notdialogplugin': TestAuthentication.Dialog}, **self.db)
|
pymysql.connect(user='pymysql_3a', plugin_map={b'notdialogplugin': TestAuthentication.Dialog}, **self.db)
|
||||||
TestAuthentication.Dialog.m = {b'Password, please:': b'I do not know'}
|
TestAuthentication.Dialog.m = {b'Password, please:': b'I do not know'}
|
||||||
@@ -241,19 +241,46 @@ class TestAuthentication(base.PyMySQLTestCase):
|
|||||||
with self.assertRaises(pymysql.err.OperationalError):
|
with self.assertRaises(pymysql.err.OperationalError):
|
||||||
pymysql.connect(user='pymysql_3a', plugin_map={b'dialog': TestAuthentication.Dialog}, **self.db)
|
pymysql.connect(user='pymysql_3a', plugin_map={b'dialog': TestAuthentication.Dialog}, **self.db)
|
||||||
|
|
||||||
|
@unittest2.skipUnless(socket_auth, "connection to unix_socket required")
|
||||||
|
@unittest2.skipIf(pam_found, "pam plugin already installed")
|
||||||
|
@unittest2.skipIf(os.environ.get('PASSWORD') is None, "PASSWORD env var required")
|
||||||
|
@unittest2.skipIf(os.environ.get('PAMSERVICE') is None, "PAMSERVICE env var required")
|
||||||
|
def testPamAuthInstallPlugin(self):
|
||||||
|
# needs plugin. lets install it.
|
||||||
|
cur = self.connections[0].cursor()
|
||||||
|
try:
|
||||||
|
cur.execute("install plugin pam soname 'auth_pam.so'")
|
||||||
|
TestAuthentication.pam_found = True
|
||||||
|
self.realTestPamAuth()
|
||||||
|
except pymysql.err.InternalError:
|
||||||
|
raise unittest2.SkipTest('we couldn\'t install the auth_pam plugin')
|
||||||
|
finally:
|
||||||
|
if TestAuthentication.pam_found:
|
||||||
|
cur.execute("uninstall plugin pam")
|
||||||
|
|
||||||
|
|
||||||
@unittest2.skipUnless(socket_auth, "connection to unix_socket required")
|
@unittest2.skipUnless(socket_auth, "connection to unix_socket required")
|
||||||
@unittest2.skipUnless(pam_found, "no pam plugin")
|
@unittest2.skipUnless(pam_found, "no pam plugin")
|
||||||
|
@unittest2.skipIf(os.environ.get('PASSWORD') is None, "PASSWORD env var required")
|
||||||
|
@unittest2.skipIf(os.environ.get('PAMSERVICE') is None, "PAMSERVICE env var required")
|
||||||
def testPamAuth(self):
|
def testPamAuth(self):
|
||||||
|
self.realTestPamAuth()
|
||||||
|
|
||||||
|
def realTestPamAuth(self):
|
||||||
db = self.db.copy()
|
db = self.db.copy()
|
||||||
db['password'] = b'bad guess at password'
|
import os
|
||||||
|
db['password'] = os.environ.get('PASSWORD')
|
||||||
|
|
||||||
with TempUser(self.connections[0].cursor(), TestAuthentication.osuser + '@localhost',
|
with TempUser(self.connections[0].cursor(), TestAuthentication.osuser + '@localhost',
|
||||||
self.databases[0]['db'], self.pam_plugin_name) as u:
|
self.databases[0]['db'], 'pam', os.environ.get('PAMSERVICE')) as u:
|
||||||
try:
|
try:
|
||||||
c = pymysql.connect(user=TestAuthentication.osuser, **db)
|
c = pymysql.connect(user=TestAuthentication.osuser, **db)
|
||||||
except pymysql.OperationalError as e:
|
except pymysql.OperationalError as e:
|
||||||
self.assertEqual(1045, e.args[0])
|
self.assertEqual(1045, e.args[0])
|
||||||
return
|
return
|
||||||
# else we had 'bad guess at password' work with pam. Well cool
|
# else we had 'bad guess at password' work with pam. Well cool
|
||||||
|
with self.assertRaises(pymysql.err.OperationalError):
|
||||||
|
pymysql.connect(user=TestAuthentication.osuser + '@localhost', plugin_map={b'mysql_cleartext_password': TestAuthentication.DefectiveHandler}, **self.db)
|
||||||
|
|
||||||
# select old_password("crummy p\tassword");
|
# select old_password("crummy p\tassword");
|
||||||
#| old_password("crummy p\tassword") |
|
#| old_password("crummy p\tassword") |
|
||||||
|
|||||||
Reference in New Issue
Block a user