From bcea1a6ee3d8b1e301fe43d62786b8295b142e90 Mon Sep 17 00:00:00 2001 From: Daniel Black Date: Sat, 29 Aug 2015 21:26:29 +1000 Subject: [PATCH] plugin sha256 - cannot handle yet --- .coveragerc | 2 +- .travis.yml | 2 ++ pymysql/tests/test_connection.py | 32 +++++++++++++++++++++++++------- 3 files changed, 28 insertions(+), 8 deletions(-) diff --git a/.coveragerc b/.coveragerc index e0a1b7b..33ae718 100644 --- a/.coveragerc +++ b/.coveragerc @@ -2,7 +2,7 @@ branch = True source = pymysql -omit = pymysql/test +omit = pymysql/test/* [report] diff --git a/.travis.yml b/.travis.yml index 7f093f1..7099da0 100644 --- a/.travis.yml +++ b/.travis.yml @@ -62,6 +62,8 @@ before_script: if [ ! -d "${P}" ]; then wget http://cdn.mysql.com/Downloads/MySQL-${DB%.*}/${F}.tar.gz -O - | tar -zxf - --directory=${HOME}/mysql ; fi; + openssl genrsa -out "${P}"/private_key.pem 2048; + openssl rsa -in "${P}"/private_key.pem -pubout -out "${P}"/public_key.pem; ${P}/scripts/mysql_install_db --defaults-file=${P}/my.cnf --basedir=${P} --datadir=${HOME}/db-"${DB}" --log-error=/tmp/mysql.err; ${P}/bin/mysqld_safe --defaults-file=${P}/my.cnf --ledir=/ --mysqld=${P}/bin/mysqld --datadir=${HOME}/db-${DB} --socket=/tmp/mysql.sock --port 3307 --innodb-buffer-pool-size=200M --lc-messages-dir=${P}/share --plugin-dir=${P}/lib/plugin/ --log-error=/tmp/mysql.err & sleep 5; cat /tmp/mysql.err; df -h; diff --git a/pymysql/tests/test_connection.py b/pymysql/tests/test_connection.py index 8fe9011..e78621e 100644 --- a/pymysql/tests/test_connection.py +++ b/pymysql/tests/test_connection.py @@ -48,6 +48,7 @@ class TestAuthentication(base.PyMySQLTestCase): three_attempts_found = False pam_found = False mysql_old_password_found = False + sha256_password_found = False import os osuser = os.environ.get('USER') @@ -63,15 +64,17 @@ class TestAuthentication(base.PyMySQLTestCase): del db['user'] cur.execute("SHOW PLUGINS") for r in cur: - if (r[1], r[2], r[3]) == (u'ACTIVE', u'AUTHENTICATION', u'auth_socket.so'): + if (r[1], r[2]) != (u'ACTIVE', u'AUTHENTICATION'): + continue + if r[3] == u'auth_socket.so': socket_plugin_name = r[0] socket_found = True - elif (r[1], r[2], r[3]) == (u'ACTIVE', u'AUTHENTICATION', u'dialog_examples.so'): + elif r[3] == u'dialog_examples.so': if r[0] == 'two_questions': two_questions_found = True elif r[0] == 'three_attempts': three_attempts_found = True - elif (r[0], r[1], r[2]) == (u'pam', u'ACTIVE', u'AUTHENTICATION'): + elif r[0] == u'pam': pam_found = True pam_plugin_name = r[3].split('.')[0] if pam_plugin_name == 'auth_pam': @@ -83,8 +86,12 @@ class TestAuthentication(base.PyMySQLTestCase): # https://mariadb.com/kb/en/mariadb/pam-authentication-plugin/ # Names differ but functionality is close - elif (r[0], r[1], r[2]) == (u'mysql_old_password', u'ACTIVE', u'AUTHENTICATION'): + elif r[0] == u'mysql_old_password': mysql_old_password_found = True + elif r[0] == u'sha256_password': + sha256_password_found = True + #else: + # print("plugin: %r" % r[0]) def test_plugin(self): # Bit of an assumption that the current user is a native password @@ -111,7 +118,6 @@ class TestAuthentication(base.PyMySQLTestCase): raise unittest2.SkipTest('we couldn\'t install the socket plugin') finally: if TestAuthentication.socket_found: - cur = self.connections[0].cursor() cur.execute("uninstall plugin %s" % self.socket_plugin_name) @unittest2.skipUnless(socket_auth, "connection to unix_socket required") @@ -174,7 +180,6 @@ class TestAuthentication(base.PyMySQLTestCase): raise unittest2.SkipTest('we couldn\'t install the two_questions plugin') finally: if TestAuthentication.two_questions_found: - cur = self.connections[0].cursor() cur.execute("uninstall plugin two_questions") @unittest2.skipUnless(socket_auth, "connection to unix_socket required") @@ -205,7 +210,6 @@ class TestAuthentication(base.PyMySQLTestCase): raise unittest2.SkipTest('we couldn\'t install the three_attempts plugin') finally: if TestAuthentication.three_attempts_found: - cur = self.connections[0].cursor() cur.execute("uninstall plugin three_attempts") @unittest2.skipUnless(socket_auth, "connection to unix_socket required") @@ -295,6 +299,20 @@ class TestAuthentication(base.PyMySQLTestCase): cur.execute("SELECT VERSION()") c.execute('set global secure_auth=%r' % secure_auth_setting) + @unittest2.skipUnless(socket_auth, "connection to unix_socket required") + @unittest2.skipUnless(sha256_password_found, "no sha256 password authentication plugin found") + def testAuthSHA256(self): + c = self.connections[0].cursor() + with TempUser(c, 'pymysql_sha256@localhost', + self.databases[0]['db'], 'sha256_password') as u: + c.execute('SET old_passwords = 2') + c.execute("SET PASSWORD FOR 'pymysql_sha256'@'localhost' = PASSWORD('Sh@256Pa33')") + db = self.db.copy() + db['password'] = "Sh@256Pa33" + # not implemented yet so thows error + with self.assertRaises(pymysql.err.OperationalError): + pymysql.connect(user='pymysql_256', **db) + class TestConnection(base.PyMySQLTestCase): def test_utf8mb4(self):