plugin sha256 - cannot handle yet
This commit is contained in:
@@ -2,7 +2,7 @@
|
||||
branch = True
|
||||
source =
|
||||
pymysql
|
||||
omit = pymysql/test
|
||||
omit = pymysql/test/*
|
||||
|
||||
|
||||
[report]
|
||||
|
||||
@@ -62,6 +62,8 @@ before_script:
|
||||
if [ ! -d "${P}" ]; then
|
||||
wget http://cdn.mysql.com/Downloads/MySQL-${DB%.*}/${F}.tar.gz -O - | tar -zxf - --directory=${HOME}/mysql ;
|
||||
fi;
|
||||
openssl genrsa -out "${P}"/private_key.pem 2048;
|
||||
openssl rsa -in "${P}"/private_key.pem -pubout -out "${P}"/public_key.pem;
|
||||
${P}/scripts/mysql_install_db --defaults-file=${P}/my.cnf --basedir=${P} --datadir=${HOME}/db-"${DB}" --log-error=/tmp/mysql.err;
|
||||
${P}/bin/mysqld_safe --defaults-file=${P}/my.cnf --ledir=/ --mysqld=${P}/bin/mysqld --datadir=${HOME}/db-${DB} --socket=/tmp/mysql.sock --port 3307 --innodb-buffer-pool-size=200M --lc-messages-dir=${P}/share --plugin-dir=${P}/lib/plugin/ --log-error=/tmp/mysql.err &
|
||||
sleep 5; cat /tmp/mysql.err; df -h;
|
||||
|
||||
@@ -48,6 +48,7 @@ class TestAuthentication(base.PyMySQLTestCase):
|
||||
three_attempts_found = False
|
||||
pam_found = False
|
||||
mysql_old_password_found = False
|
||||
sha256_password_found = False
|
||||
|
||||
import os
|
||||
osuser = os.environ.get('USER')
|
||||
@@ -63,15 +64,17 @@ class TestAuthentication(base.PyMySQLTestCase):
|
||||
del db['user']
|
||||
cur.execute("SHOW PLUGINS")
|
||||
for r in cur:
|
||||
if (r[1], r[2], r[3]) == (u'ACTIVE', u'AUTHENTICATION', u'auth_socket.so'):
|
||||
if (r[1], r[2]) != (u'ACTIVE', u'AUTHENTICATION'):
|
||||
continue
|
||||
if r[3] == u'auth_socket.so':
|
||||
socket_plugin_name = r[0]
|
||||
socket_found = True
|
||||
elif (r[1], r[2], r[3]) == (u'ACTIVE', u'AUTHENTICATION', u'dialog_examples.so'):
|
||||
elif r[3] == u'dialog_examples.so':
|
||||
if r[0] == 'two_questions':
|
||||
two_questions_found = True
|
||||
elif r[0] == 'three_attempts':
|
||||
three_attempts_found = True
|
||||
elif (r[0], r[1], r[2]) == (u'pam', u'ACTIVE', u'AUTHENTICATION'):
|
||||
elif r[0] == u'pam':
|
||||
pam_found = True
|
||||
pam_plugin_name = r[3].split('.')[0]
|
||||
if pam_plugin_name == 'auth_pam':
|
||||
@@ -83,8 +86,12 @@ class TestAuthentication(base.PyMySQLTestCase):
|
||||
# https://mariadb.com/kb/en/mariadb/pam-authentication-plugin/
|
||||
|
||||
# Names differ but functionality is close
|
||||
elif (r[0], r[1], r[2]) == (u'mysql_old_password', u'ACTIVE', u'AUTHENTICATION'):
|
||||
elif r[0] == u'mysql_old_password':
|
||||
mysql_old_password_found = True
|
||||
elif r[0] == u'sha256_password':
|
||||
sha256_password_found = True
|
||||
#else:
|
||||
# print("plugin: %r" % r[0])
|
||||
|
||||
def test_plugin(self):
|
||||
# Bit of an assumption that the current user is a native password
|
||||
@@ -111,7 +118,6 @@ class TestAuthentication(base.PyMySQLTestCase):
|
||||
raise unittest2.SkipTest('we couldn\'t install the socket plugin')
|
||||
finally:
|
||||
if TestAuthentication.socket_found:
|
||||
cur = self.connections[0].cursor()
|
||||
cur.execute("uninstall plugin %s" % self.socket_plugin_name)
|
||||
|
||||
@unittest2.skipUnless(socket_auth, "connection to unix_socket required")
|
||||
@@ -174,7 +180,6 @@ class TestAuthentication(base.PyMySQLTestCase):
|
||||
raise unittest2.SkipTest('we couldn\'t install the two_questions plugin')
|
||||
finally:
|
||||
if TestAuthentication.two_questions_found:
|
||||
cur = self.connections[0].cursor()
|
||||
cur.execute("uninstall plugin two_questions")
|
||||
|
||||
@unittest2.skipUnless(socket_auth, "connection to unix_socket required")
|
||||
@@ -205,7 +210,6 @@ class TestAuthentication(base.PyMySQLTestCase):
|
||||
raise unittest2.SkipTest('we couldn\'t install the three_attempts plugin')
|
||||
finally:
|
||||
if TestAuthentication.three_attempts_found:
|
||||
cur = self.connections[0].cursor()
|
||||
cur.execute("uninstall plugin three_attempts")
|
||||
|
||||
@unittest2.skipUnless(socket_auth, "connection to unix_socket required")
|
||||
@@ -295,6 +299,20 @@ class TestAuthentication(base.PyMySQLTestCase):
|
||||
cur.execute("SELECT VERSION()")
|
||||
c.execute('set global secure_auth=%r' % secure_auth_setting)
|
||||
|
||||
@unittest2.skipUnless(socket_auth, "connection to unix_socket required")
|
||||
@unittest2.skipUnless(sha256_password_found, "no sha256 password authentication plugin found")
|
||||
def testAuthSHA256(self):
|
||||
c = self.connections[0].cursor()
|
||||
with TempUser(c, 'pymysql_sha256@localhost',
|
||||
self.databases[0]['db'], 'sha256_password') as u:
|
||||
c.execute('SET old_passwords = 2')
|
||||
c.execute("SET PASSWORD FOR 'pymysql_sha256'@'localhost' = PASSWORD('Sh@256Pa33')")
|
||||
db = self.db.copy()
|
||||
db['password'] = "Sh@256Pa33"
|
||||
# not implemented yet so thows error
|
||||
with self.assertRaises(pymysql.err.OperationalError):
|
||||
pymysql.connect(user='pymysql_256', **db)
|
||||
|
||||
class TestConnection(base.PyMySQLTestCase):
|
||||
|
||||
def test_utf8mb4(self):
|
||||
|
||||
Reference in New Issue
Block a user