Refactor and unit test json auth parsing

There's a bunch of code copied-and-pasted between the json parsers
in the two auth types. Refactor the code out into a base class.

First, though, unit test the code to ensure no regressions.

v2: update test_user_auth_with_no_{name,password} to expect a
    400 return instead of 401.

Change-Id: I50802419ec82d98f4cb56e93c163158d3326c5c9
This commit is contained in:
Mark McLoughlin 2011-09-30 14:28:02 +01:00
parent 4ed44af629
commit 12b501e557
4 changed files with 231 additions and 55 deletions

View File

@ -19,6 +19,7 @@ Josh Kearney <josh@jk0.org>
Kevin L. Mitchell <kevin.mitchell@rackspace.com>
Khaled Hussein <khaled.hussein@gmail.com>
Mark Gius <mgius7096@gmail.com>
Mark McLoughlin <markmc@redhat.com>
Monty Taylor <mordred@inaugust.com>
Paul VOccio <paul@substation9.com>
Ramana Juvvadi <rjuvvadi@hcl.com>

View File

@ -20,12 +20,51 @@ from keystone.logic.types import fault
import keystone.backends.api as db_api
class AuthWithUnscopedToken(object):
def __init__(self, token_id, tenant_id=None, tenant_name=None):
self.token_id = token_id
class AuthBase(object):
def __init__(self, tenant_id=None, tenant_name=None):
self.tenant_id = tenant_id
self.tenant_name = tenant_name
@staticmethod
def _validate_auth(obj, *valid_keys):
if not 'auth' in obj:
raise fault.BadRequestFault('Expecting auth')
auth = obj.get('auth')
for key in auth:
if not key in valid_keys:
raise fault.BadRequestFault('Invalid attribute(s): %s' % key)
if auth.get('tenantId') and auth.get('tenantName'):
raise fault.BadRequestFault(
'Expecting either Tenant ID or Tenant Name, but not both')
return auth
@staticmethod
def _validate_key(obj, key, *required_keys):
if not key in obj:
raise fault.BadRequestFault('Expecting %s' % key)
ret = obj[key]
for skey in ret:
if not skey in required_keys:
raise fault.BadRequestFault('Invalid attribute(s): %s' % skey)
for required_key in required_keys:
if not ret.get(required_key):
raise fault.BadRequestFault('Expecting %s:%s' %
(key, required_key))
return ret
class AuthWithUnscopedToken(AuthBase):
def __init__(self, token_id, tenant_id=None, tenant_name=None):
super(AuthWithUnscopedToken, self).__init__(tenant_id, tenant_name)
self.token_id = token_id
@staticmethod
def from_xml(xml_str):
try:
@ -59,33 +98,24 @@ class AuthWithUnscopedToken(object):
def from_json(json_str):
try:
obj = json.loads(json_str)
if not obj.get("auth"):
raise fault.BadRequestFault("Expecting auth")
if not obj["auth"].get("token"):
raise fault.BadRequestFault("Expecting token")
token = obj['auth']['token']
if not token.get("id"):
raise fault.BadRequestFault("Expecting token id")
token_id = obj["auth"]["token"]["id"]
tenant_id = obj["auth"].get("tenantId")
tenant_name = obj["auth"].get("tenantName")
auth = AuthBase._validate_auth(obj, 'tenantId', 'tenantName',
'token')
token = AuthBase._validate_key(auth, 'token', 'id')
if tenant_id and tenant_name:
raise fault.BadRequestFault(
"Expecting either Tenant ID or Tenant Name, but not both")
return AuthWithUnscopedToken(token_id, tenant_id, tenant_name)
return AuthWithUnscopedToken(token['id'],
auth.get('tenantId'),
auth.get('tenantName'))
except (ValueError, TypeError) as e:
raise fault.BadRequestFault("Cannot parse auth", str(e))
class AuthWithPasswordCredentials(object):
class AuthWithPasswordCredentials(AuthBase):
def __init__(self, username, password, tenant_id=None, tenant_name=None):
super(AuthWithPasswordCredentials, self).__init__(tenant_id,
tenant_name)
self.username = username
self.password = password
self.tenant_id = tenant_id
self.tenant_name = tenant_name
@staticmethod
def from_xml(xml_str):
@ -123,35 +153,16 @@ class AuthWithPasswordCredentials(object):
def from_json(json_str):
try:
obj = json.loads(json_str)
if not "auth" in obj:
raise fault.BadRequestFault("Expecting auth")
auth = obj["auth"]
invalid = [key for key in auth if key not in\
['tenantId', 'tenantName', 'passwordCredentials']]
if invalid != []:
raise fault.BadRequestFault("Invalid attribute(s): %s"
% invalid)
tenant_id = auth.get('tenantId')
tenant_name = auth.get('tenantName')
auth = AuthBase._validate_auth(obj, 'tenantId', 'tenantName',
'passwordCredentials')
cred = AuthBase._validate_key(auth, 'passwordCredentials',
'username', 'password')
if not "passwordCredentials" in auth:
raise fault.BadRequestFault("Expecting passwordCredentials")
cred = auth["passwordCredentials"]
# Check that fields are valid
invalid = [key for key in cred if key not in\
['username', 'password']]
if invalid != []:
raise fault.BadRequestFault("Invalid attribute(s): %s"
% invalid)
if not "username" in cred:
raise fault.BadRequestFault("Expecting a username")
username = cred["username"]
if not "password" in cred:
raise fault.BadRequestFault("Expecting a password")
password = cred["password"]
return AuthWithPasswordCredentials(username, password, tenant_id,
tenant_name)
return AuthWithPasswordCredentials(cred['username'],
cred['password'],
auth.get('tenantId'),
auth.get('tenantName'))
except (ValueError, TypeError) as e:
raise fault.BadRequestFault("Cannot parse auth", str(e))

View File

@ -293,11 +293,10 @@ class TestServiceAuthentication(common.FunctionalTestCase):
'password': self.user['password']}}})
def test_user_auth_with_no_name(self):
"""Authenticating without a username returns a 401"""
"""Authenticating without a username returns a 400"""
# Authenticate as user to get a token
self.post_token(assert_status=401, as_json={
self.post_token(assert_status=400, as_json={
'auth': {'passwordCredentials': {
'username': None,
'password': self.user['password']}}})
def test_user_auth_with_wrong_password(self):
@ -309,9 +308,9 @@ class TestServiceAuthentication(common.FunctionalTestCase):
'password': 'this-is-completely-wrong'}}})
def test_user_auth_with_no_password(self):
"""Authenticating with an invalid password returns a 401"""
"""Authenticating with an invalid password returns a 400"""
# Authenticate as user to get a token
self.post_token(assert_status=401, as_json={
self.post_token(assert_status=400, as_json={
'auth': {'passwordCredentials': {
'username': self.user['name'],
'password': None}}})

View File

@ -1,5 +1,7 @@
import json
import unittest2 as unittest
import keystone.logic.types.auth as auth
import keystone.logic.types.fault as fault
class TestAuth(unittest.TestCase):
@ -14,9 +16,172 @@ class TestAuth(unittest.TestCase):
def test_pwd_cred_marshall(self):
creds = auth.AuthWithPasswordCredentials.from_xml(self.pwd_xml)
self.assertTrue(creds.password, "secret")
self.assertTrue(creds.username, "username")
self.assertEqual(creds.password, "secret")
self.assertEqual(creds.username, "disabled")
def test_pwd_creds_from_json(self):
data = json.dumps({"auth":
{"passwordCredentials":
{"username": "foo", "password": "bar"}}})
creds = auth.AuthWithPasswordCredentials.from_json(data)
self.assertEqual(creds.username, "foo")
self.assertEqual(creds.password, "bar")
self.assertIsNone(creds.tenant_id)
self.assertIsNone(creds.tenant_name)
def test_pwd_creds_with_tenant_from_json(self):
data = json.dumps({"auth":
{"tenantName": "blaa",
"passwordCredentials":
{"username": "foo", "password": "bar"}}})
creds = auth.AuthWithPasswordCredentials.from_json(data)
self.assertEqual(creds.username, "foo")
self.assertEqual(creds.password, "bar")
self.assertIsNone(creds.tenant_id)
self.assertEqual(creds.tenant_name, "blaa")
def test_pwd_creds_with_tenant_from_json(self):
data = json.dumps({"auth":
{"tenantId": "blaa",
"passwordCredentials":
{"username": "foo", "password": "bar"}}})
creds = auth.AuthWithPasswordCredentials.from_json(data)
self.assertEqual(creds.username, "foo")
self.assertEqual(creds.password, "bar")
self.assertEqual(creds.tenant_id, "blaa")
self.assertIsNone(creds.tenant_name)
def test_pwd_not_both_tenant_from_json(self):
data = json.dumps({"auth": {"tenantId": "blaa", "tenantName": "aalb"}})
self.assertRaisesRegexp(fault.BadRequestFault,
"not both",
auth.AuthWithPasswordCredentials.from_json,
data)
def test_pwd_invalid_from_json(self):
self.assertRaisesRegexp(fault.BadRequestFault,
"Cannot parse",
auth.AuthWithPasswordCredentials.from_json,
"")
def test_pwd_no_auth_from_json(self):
data = json.dumps({"foo": "bar"})
self.assertRaisesRegexp(fault.BadRequestFault,
"Expecting auth",
auth.AuthWithPasswordCredentials.from_json,
data)
def test_pwd_no_creds_from_json(self):
data = json.dumps({"auth": {}})
self.assertRaisesRegexp(fault.BadRequestFault,
"Expecting passwordCredentials",
auth.AuthWithPasswordCredentials.from_json,
data)
def test_pwd_invalid_attribute_from_json(self):
data = json.dumps({"auth": {"foo": "bar"}})
self.assertRaisesRegexp(fault.BadRequestFault,
"Invalid",
auth.AuthWithPasswordCredentials.from_json,
data)
def test_pwd_no_username_from_json(self):
data = json.dumps({"auth": {"passwordCredentials": {}}})
self.assertRaisesRegexp(fault.BadRequestFault,
"Expecting passwordCredentials:username",
auth.AuthWithPasswordCredentials.from_json,
data)
def test_pwd_no_password_from_json(self):
data = json.dumps({"auth": {"passwordCredentials":
{"username": "foo"}}})
self.assertRaisesRegexp(fault.BadRequestFault,
"Expecting passwordCredentials:password",
auth.AuthWithPasswordCredentials.from_json,
data)
def test_pwd_invalid_creds_attribute_from_json(self):
data = json.dumps({"auth": {"passwordCredentials": {"bar": "foo"}}})
self.assertRaisesRegexp(fault.BadRequestFault,
"Invalid",
auth.AuthWithPasswordCredentials.from_json,
data)
def test_token_creds_from_json(self):
data = json.dumps({"auth": {"token": {"id": "1"}}})
creds = auth.AuthWithUnscopedToken.from_json(data)
self.assertEqual(creds.token_id, "1")
self.assertIsNone(creds.tenant_id)
self.assertIsNone(creds.tenant_name)
def test_token_creds_with_tenant_name_from_json(self):
data = json.dumps({"auth":
{"tenantName": "blaa",
"token": {"id": "1"}}})
creds = auth.AuthWithUnscopedToken.from_json(data)
self.assertEqual(creds.token_id, "1")
self.assertIsNone(creds.tenant_id)
self.assertEqual(creds.tenant_name, "blaa")
def test_token_creds_with_tenant_id_from_json(self):
data = json.dumps({"auth":
{"tenantId": "blaa",
"token": {"id": "1"}}})
creds = auth.AuthWithUnscopedToken.from_json(data)
self.assertEqual(creds.token_id, "1")
self.assertEqual(creds.tenant_id, "blaa")
self.assertIsNone(creds.tenant_name)
def test_token_not_both_tenant_from_json(self):
data = json.dumps({"auth":
{"tenantId": "blaa",
"tenantName": "aalb",
"token": {"id": "1"}}})
self.assertRaisesRegexp(fault.BadRequestFault,
"not both",
auth.AuthWithUnscopedToken.from_json,
data)
def test_token_invalid_from_json(self):
self.assertRaisesRegexp(fault.BadRequestFault,
"Cannot parse",
auth.AuthWithUnscopedToken.from_json,
"")
def test_token_no_auth_from_json(self):
data = json.dumps({"foo": "bar"})
self.assertRaisesRegexp(fault.BadRequestFault,
"Expecting auth",
auth.AuthWithUnscopedToken.from_json,
data)
def test_token_no_creds_from_json(self):
data = json.dumps({"auth": {}})
self.assertRaisesRegexp(fault.BadRequestFault,
"Expecting token",
auth.AuthWithUnscopedToken.from_json,
data)
def test_token_invalid_attribute_from_json(self):
data = json.dumps({"auth": {"foo": "bar"}})
self.assertRaisesRegexp(fault.BadRequestFault,
"Invalid",
auth.AuthWithUnscopedToken.from_json,
data)
def test_token_no_id_from_json(self):
data = json.dumps({"auth": {"token": {}}})
self.assertRaisesRegexp(fault.BadRequestFault,
"Expecting token:id",
auth.AuthWithUnscopedToken.from_json,
data)
def test_token_invalid_attribute_from_json(self):
data = json.dumps({"auth": {"token": {"bar": "foo"}}})
self.assertRaisesRegexp(fault.BadRequestFault,
"Invalid",
auth.AuthWithUnscopedToken.from_json,
data)
if __name__ == '__main__':
unittest.main()