diff --git a/pymysql/connections.py b/pymysql/connections.py index 2d18c46..0b06cc5 100644 --- a/pymysql/connections.py +++ b/pymysql/connections.py @@ -195,7 +195,8 @@ def _hash_password_323(password): add = 7 nr2 = 0x12345671 - for c in [byte2int(x) for x in password if x not in (' ', '\t')]: + # x in py3 is numbers, p27 is chars + for c in [byte2int(x) for x in password if x not in (' ', '\t', 32, 9)]: nr ^= (((nr & 63) + add) * c) + (nr << 8) & 0xFFFFFFFF nr2 = (nr2 + ((nr2 << 8) ^ nr)) & 0xFFFFFFFF add = (add + c) & 0xFFFFFFFF @@ -1088,9 +1089,9 @@ class Connection(object): if auth_packet.is_auth_switch_request(): # https://dev.mysql.com/doc/internals/en/connection-phase-packets.html#packet-Protocol::AuthSwitchRequest - if self.server_capabilities & CLIENT.PLUGIN_AUTH: - auth_packet.read_uint8() # 0xfe packet identifier - plugin_name = auth_packet.read_string() + auth_packet.read_uint8() # 0xfe packet identifier + plugin_name = auth_packet.read_string() + if self.server_capabilities & CLIENT.PLUGIN_AUTH and plugin_name is not None: auth_packet = self._process_auth(plugin_name, auth_packet) else: # send legacy handshake @@ -1105,21 +1106,21 @@ class Connection(object): try: return handler.authenticate(auth_pkt) except AttributeError: - if plugin_name != 'dialog': + if plugin_name != b'dialog': raise err.OperationalError(2059, "Authentication plugin '%s'" + " not loaded: - missing authenticate method" % plugin) else: handler = None - if plugin_name == "mysql_native_password": + if plugin_name == b"mysql_native_password": # https://dev.mysql.com/doc/internals/en/secure-password-authentication.html#packet-Authentication::Native41 data = _scramble(self.password.encode('latin1'), auth_packet.read_all()) + b'\0' - elif plugin_name == "mysql_old_password": + elif plugin_name == b"mysql_old_password": # https://dev.mysql.com/doc/internals/en/old-password-authentication.html data = _scramble_323(self.password.encode('latin1'), auth_packet.read_all()) + b'\0' - elif plugin_name == "mysql_clear_password": + elif plugin_name == b"mysql_clear_password": # https://dev.mysql.com/doc/internals/en/clear-text-authentication.html data = self.password.encode('latin1') + b'\0' - elif plugin_name == "dialog": + elif plugin_name == b"dialog": pkt = auth_packet while True: flag = pkt.read_uint8() diff --git a/pymysql/tests/test_connection.py b/pymysql/tests/test_connection.py index e4ab01d..99cbecd 100644 --- a/pymysql/tests/test_connection.py +++ b/pymysql/tests/test_connection.py @@ -1,5 +1,6 @@ import datetime import decimal +import sys import pymysql import time import unittest2 @@ -7,14 +8,17 @@ from pymysql.tests import base class TempUser: - def __init__(self, c, user, db, auth, authdata=None): + def __init__(self, c, user, db, auth=None, authdata=None, password=None): self._c = c self._user = user self._db = db - create = "CREATE USER %s" \ - " IDENTIFIED WITH %s" % (user, auth) - if authdata is not None: - create += " AS '%s'" % authdata + create = "CREATE USER " + user + if password is not None: + create += " IDENTIFIED BY '%s'" % password + elif auth is not None: + create += " IDENTIFIED WITH %s" % auth + if authdata is not None: + create += " AS '%s'" % authdata try: c.execute(create) self._created = True @@ -43,6 +47,7 @@ class TestAuthentication(base.PyMySQLTestCase): two_questions_found = False three_attempts_found = False pam_found = False + mysql_old_password_found = False import os osuser = os.environ.get('USER') @@ -61,12 +66,12 @@ class TestAuthentication(base.PyMySQLTestCase): if (r[1], r[2], r[3]) == (u'ACTIVE', u'AUTHENTICATION', u'auth_socket.so'): socket_plugin_name = r[0] socket_found = True - if (r[1], r[2], r[3]) == (u'ACTIVE', u'AUTHENTICATION', u'dialog_examples.so'): + elif (r[1], r[2], r[3]) == (u'ACTIVE', u'AUTHENTICATION', u'dialog_examples.so'): if r[0] == 'two_questions': two_questions_found = True elif r[0] == 'three_attempts': three_attempts_found = True - if (r[0], r[1], r[2]) == (u'pam', u'ACTIVE', u'AUTHENTICATION'): + elif (r[0], r[1], r[2]) == (u'pam', u'ACTIVE', u'AUTHENTICATION'): pam_found = True pam_plugin_name = r[3].split('.')[0] if pam_plugin_name == 'auth_pam': @@ -78,6 +83,8 @@ class TestAuthentication(base.PyMySQLTestCase): # https://mariadb.com/kb/en/mariadb/pam-authentication-plugin/ # Names differ but functionality is close + elif (r[0], r[1], r[2]) == (u'mysql_old_password', u'ACTIVE', u'AUTHENTICATION'): + mysql_old_password_found = True def test_plugin(self): # Bit of an assumption that the current user is a native password @@ -163,6 +170,49 @@ class TestAuthentication(base.PyMySQLTestCase): return # else we had 'bad guess at password' work with pam. Well cool + # select old_password("crummy p\tassword"); + #| old_password("crummy p\tassword") | + #| 2a01785203b08770 | + @unittest2.skipUnless(socket_auth, "connection to unix_socket required") + @unittest2.skipUnless(mysql_old_password_found, "no mysql_old_password plugin") + def testMySQLOldPasswordAuth(self): + if self.mysql_server_is(self.connections[0], (5, 7, 0)): + raise unittest2.SkipTest('Old passwords aren\'t supported in 5.7') + # pymysql.err.OperationalError: (1045, "Access denied for user 'old_pass_user'@'localhost' (using password: YES)") + # from login in MySQL-5.6 + if self.mysql_server_is(self.connections[0], (5, 6, 0)): + raise unittest2.SkipTest('Old passwords don\'t authenticate in 5.6') + db = self.db.copy() + db['password'] = "crummy p\tassword" + with self.connections[0] as c: + # deprecated in 5.6 + if sys.version_info[0:2] >= (3,2) and self.mysql_server_is(self.connections[0], (5, 6, 0)): + with self.assertWarns(pymysql.err.Warning) as cm: + c.execute("SELECT OLD_PASSWORD('%s')" % db['password']) + else: + c.execute("SELECT OLD_PASSWORD('%s')" % db['password']) + v = c.fetchone()[0] + self.assertEqual(v, '2a01785203b08770') + # only works in MariaDB and MySQL-5.6 - can't separate out by version + #if self.mysql_server_is(self.connections[0], (5, 5, 0)): + # with TempUser(c, 'old_pass_user@localhost', + # self.databases[0]['db'], 'mysql_old_password', '2a01785203b08770') as u: + # cur = pymysql.connect(user='old_pass_user', **db).cursor() + # cur.execute("SELECT VERSION()") + c.execute("SELECT @@secure_auth") + secure_auth_setting = c.fetchone()[0] + c.execute('set old_passwords=1') + # pymysql.err.Warning: 'pre-4.1 password hash' is deprecated and will be removed in a future release. Please use post-4.1 password hash instead + if sys.version_info[0:2] >= (3,2) and self.mysql_server_is(self.connections[0], (5, 6, 0)): + with self.assertWarns(pymysql.err.Warning) as cm: + c.execute('set global secure_auth=0') + else: + c.execute('set global secure_auth=0') + with TempUser(c, 'old_pass_user@localhost', + self.databases[0]['db'], password=db['password']) as u: + cur = pymysql.connect(user='old_pass_user', **db).cursor() + cur.execute("SELECT VERSION()") + c.execute('set global secure_auth=%r' % secure_auth_setting) class TestConnection(base.PyMySQLTestCase):