add tests for old_password. Fix py3 old password authentication. empty plugin name occurs with PLUGIN_AUTH meaning old passwords

This commit is contained in:
Daniel Black
2015-08-27 15:55:03 +10:00
parent ea45a731ca
commit a52e1b7c6a
2 changed files with 67 additions and 16 deletions

View File

@@ -195,7 +195,8 @@ def _hash_password_323(password):
add = 7
nr2 = 0x12345671
for c in [byte2int(x) for x in password if x not in (' ', '\t')]:
# x in py3 is numbers, p27 is chars
for c in [byte2int(x) for x in password if x not in (' ', '\t', 32, 9)]:
nr ^= (((nr & 63) + add) * c) + (nr << 8) & 0xFFFFFFFF
nr2 = (nr2 + ((nr2 << 8) ^ nr)) & 0xFFFFFFFF
add = (add + c) & 0xFFFFFFFF
@@ -1088,9 +1089,9 @@ class Connection(object):
if auth_packet.is_auth_switch_request():
# https://dev.mysql.com/doc/internals/en/connection-phase-packets.html#packet-Protocol::AuthSwitchRequest
if self.server_capabilities & CLIENT.PLUGIN_AUTH:
auth_packet.read_uint8() # 0xfe packet identifier
plugin_name = auth_packet.read_string()
if self.server_capabilities & CLIENT.PLUGIN_AUTH and plugin_name is not None:
auth_packet = self._process_auth(plugin_name, auth_packet)
else:
# send legacy handshake
@@ -1105,21 +1106,21 @@ class Connection(object):
try:
return handler.authenticate(auth_pkt)
except AttributeError:
if plugin_name != 'dialog':
if plugin_name != b'dialog':
raise err.OperationalError(2059, "Authentication plugin '%s'" +
" not loaded: - missing authenticate method" % plugin)
else:
handler = None
if plugin_name == "mysql_native_password":
if plugin_name == b"mysql_native_password":
# https://dev.mysql.com/doc/internals/en/secure-password-authentication.html#packet-Authentication::Native41
data = _scramble(self.password.encode('latin1'), auth_packet.read_all()) + b'\0'
elif plugin_name == "mysql_old_password":
elif plugin_name == b"mysql_old_password":
# https://dev.mysql.com/doc/internals/en/old-password-authentication.html
data = _scramble_323(self.password.encode('latin1'), auth_packet.read_all()) + b'\0'
elif plugin_name == "mysql_clear_password":
elif plugin_name == b"mysql_clear_password":
# https://dev.mysql.com/doc/internals/en/clear-text-authentication.html
data = self.password.encode('latin1') + b'\0'
elif plugin_name == "dialog":
elif plugin_name == b"dialog":
pkt = auth_packet
while True:
flag = pkt.read_uint8()

View File

@@ -1,5 +1,6 @@
import datetime
import decimal
import sys
import pymysql
import time
import unittest2
@@ -7,12 +8,15 @@ from pymysql.tests import base
class TempUser:
def __init__(self, c, user, db, auth, authdata=None):
def __init__(self, c, user, db, auth=None, authdata=None, password=None):
self._c = c
self._user = user
self._db = db
create = "CREATE USER %s" \
" IDENTIFIED WITH %s" % (user, auth)
create = "CREATE USER " + user
if password is not None:
create += " IDENTIFIED BY '%s'" % password
elif auth is not None:
create += " IDENTIFIED WITH %s" % auth
if authdata is not None:
create += " AS '%s'" % authdata
try:
@@ -43,6 +47,7 @@ class TestAuthentication(base.PyMySQLTestCase):
two_questions_found = False
three_attempts_found = False
pam_found = False
mysql_old_password_found = False
import os
osuser = os.environ.get('USER')
@@ -61,12 +66,12 @@ class TestAuthentication(base.PyMySQLTestCase):
if (r[1], r[2], r[3]) == (u'ACTIVE', u'AUTHENTICATION', u'auth_socket.so'):
socket_plugin_name = r[0]
socket_found = True
if (r[1], r[2], r[3]) == (u'ACTIVE', u'AUTHENTICATION', u'dialog_examples.so'):
elif (r[1], r[2], r[3]) == (u'ACTIVE', u'AUTHENTICATION', u'dialog_examples.so'):
if r[0] == 'two_questions':
two_questions_found = True
elif r[0] == 'three_attempts':
three_attempts_found = True
if (r[0], r[1], r[2]) == (u'pam', u'ACTIVE', u'AUTHENTICATION'):
elif (r[0], r[1], r[2]) == (u'pam', u'ACTIVE', u'AUTHENTICATION'):
pam_found = True
pam_plugin_name = r[3].split('.')[0]
if pam_plugin_name == 'auth_pam':
@@ -78,6 +83,8 @@ class TestAuthentication(base.PyMySQLTestCase):
# https://mariadb.com/kb/en/mariadb/pam-authentication-plugin/
# Names differ but functionality is close
elif (r[0], r[1], r[2]) == (u'mysql_old_password', u'ACTIVE', u'AUTHENTICATION'):
mysql_old_password_found = True
def test_plugin(self):
# Bit of an assumption that the current user is a native password
@@ -163,6 +170,49 @@ class TestAuthentication(base.PyMySQLTestCase):
return
# else we had 'bad guess at password' work with pam. Well cool
# select old_password("crummy p\tassword");
#| old_password("crummy p\tassword") |
#| 2a01785203b08770 |
@unittest2.skipUnless(socket_auth, "connection to unix_socket required")
@unittest2.skipUnless(mysql_old_password_found, "no mysql_old_password plugin")
def testMySQLOldPasswordAuth(self):
if self.mysql_server_is(self.connections[0], (5, 7, 0)):
raise unittest2.SkipTest('Old passwords aren\'t supported in 5.7')
# pymysql.err.OperationalError: (1045, "Access denied for user 'old_pass_user'@'localhost' (using password: YES)")
# from login in MySQL-5.6
if self.mysql_server_is(self.connections[0], (5, 6, 0)):
raise unittest2.SkipTest('Old passwords don\'t authenticate in 5.6')
db = self.db.copy()
db['password'] = "crummy p\tassword"
with self.connections[0] as c:
# deprecated in 5.6
if sys.version_info[0:2] >= (3,2) and self.mysql_server_is(self.connections[0], (5, 6, 0)):
with self.assertWarns(pymysql.err.Warning) as cm:
c.execute("SELECT OLD_PASSWORD('%s')" % db['password'])
else:
c.execute("SELECT OLD_PASSWORD('%s')" % db['password'])
v = c.fetchone()[0]
self.assertEqual(v, '2a01785203b08770')
# only works in MariaDB and MySQL-5.6 - can't separate out by version
#if self.mysql_server_is(self.connections[0], (5, 5, 0)):
# with TempUser(c, 'old_pass_user@localhost',
# self.databases[0]['db'], 'mysql_old_password', '2a01785203b08770') as u:
# cur = pymysql.connect(user='old_pass_user', **db).cursor()
# cur.execute("SELECT VERSION()")
c.execute("SELECT @@secure_auth")
secure_auth_setting = c.fetchone()[0]
c.execute('set old_passwords=1')
# pymysql.err.Warning: 'pre-4.1 password hash' is deprecated and will be removed in a future release. Please use post-4.1 password hash instead
if sys.version_info[0:2] >= (3,2) and self.mysql_server_is(self.connections[0], (5, 6, 0)):
with self.assertWarns(pymysql.err.Warning) as cm:
c.execute('set global secure_auth=0')
else:
c.execute('set global secure_auth=0')
with TempUser(c, 'old_pass_user@localhost',
self.databases[0]['db'], password=db['password']) as u:
cur = pymysql.connect(user='old_pass_user', **db).cursor()
cur.execute("SELECT VERSION()")
c.execute('set global secure_auth=%r' % secure_auth_setting)
class TestConnection(base.PyMySQLTestCase):