From 12b501e557370a8b3be0fbb527285f6c31d9f865 Mon Sep 17 00:00:00 2001 From: Mark McLoughlin Date: Fri, 30 Sep 2011 14:28:02 +0100 Subject: [PATCH] Refactor and unit test json auth parsing There's a bunch of code copied-and-pasted between the json parsers in the two auth types. Refactor the code out into a base class. First, though, unit test the code to ensure no regressions. v2: update test_user_auth_with_no_{name,password} to expect a 400 return instead of 401. Change-Id: I50802419ec82d98f4cb56e93c163158d3326c5c9 --- AUTHORS | 1 + keystone/logic/types/auth.py | 107 ++++++++-------- keystone/test/functional/test_auth.py | 9 +- keystone/test/unit/test_auth.py | 169 +++++++++++++++++++++++++- 4 files changed, 231 insertions(+), 55 deletions(-) diff --git a/AUTHORS b/AUTHORS index 7046cc0cbf..7936b430be 100644 --- a/AUTHORS +++ b/AUTHORS @@ -19,6 +19,7 @@ Josh Kearney Kevin L. Mitchell Khaled Hussein Mark Gius +Mark McLoughlin Monty Taylor Paul VOccio Ramana Juvvadi diff --git a/keystone/logic/types/auth.py b/keystone/logic/types/auth.py index 059a668129..78d8ad49de 100755 --- a/keystone/logic/types/auth.py +++ b/keystone/logic/types/auth.py @@ -20,12 +20,51 @@ from keystone.logic.types import fault import keystone.backends.api as db_api -class AuthWithUnscopedToken(object): - def __init__(self, token_id, tenant_id=None, tenant_name=None): - self.token_id = token_id +class AuthBase(object): + def __init__(self, tenant_id=None, tenant_name=None): self.tenant_id = tenant_id self.tenant_name = tenant_name + @staticmethod + def _validate_auth(obj, *valid_keys): + if not 'auth' in obj: + raise fault.BadRequestFault('Expecting auth') + + auth = obj.get('auth') + + for key in auth: + if not key in valid_keys: + raise fault.BadRequestFault('Invalid attribute(s): %s' % key) + + if auth.get('tenantId') and auth.get('tenantName'): + raise fault.BadRequestFault( + 'Expecting either Tenant ID or Tenant Name, but not both') + + return auth + + @staticmethod + def _validate_key(obj, key, *required_keys): + if not key in obj: + raise fault.BadRequestFault('Expecting %s' % key) + + ret = obj[key] + + for skey in ret: + if not skey in required_keys: + raise fault.BadRequestFault('Invalid attribute(s): %s' % skey) + + for required_key in required_keys: + if not ret.get(required_key): + raise fault.BadRequestFault('Expecting %s:%s' % + (key, required_key)) + return ret + + +class AuthWithUnscopedToken(AuthBase): + def __init__(self, token_id, tenant_id=None, tenant_name=None): + super(AuthWithUnscopedToken, self).__init__(tenant_id, tenant_name) + self.token_id = token_id + @staticmethod def from_xml(xml_str): try: @@ -59,33 +98,24 @@ class AuthWithUnscopedToken(object): def from_json(json_str): try: obj = json.loads(json_str) - if not obj.get("auth"): - raise fault.BadRequestFault("Expecting auth") - if not obj["auth"].get("token"): - raise fault.BadRequestFault("Expecting token") - token = obj['auth']['token'] - if not token.get("id"): - raise fault.BadRequestFault("Expecting token id") - token_id = obj["auth"]["token"]["id"] - tenant_id = obj["auth"].get("tenantId") - tenant_name = obj["auth"].get("tenantName") + auth = AuthBase._validate_auth(obj, 'tenantId', 'tenantName', + 'token') + token = AuthBase._validate_key(auth, 'token', 'id') - if tenant_id and tenant_name: - raise fault.BadRequestFault( - "Expecting either Tenant ID or Tenant Name, but not both") - - return AuthWithUnscopedToken(token_id, tenant_id, tenant_name) + return AuthWithUnscopedToken(token['id'], + auth.get('tenantId'), + auth.get('tenantName')) except (ValueError, TypeError) as e: raise fault.BadRequestFault("Cannot parse auth", str(e)) -class AuthWithPasswordCredentials(object): +class AuthWithPasswordCredentials(AuthBase): def __init__(self, username, password, tenant_id=None, tenant_name=None): + super(AuthWithPasswordCredentials, self).__init__(tenant_id, + tenant_name) self.username = username self.password = password - self.tenant_id = tenant_id - self.tenant_name = tenant_name @staticmethod def from_xml(xml_str): @@ -123,35 +153,16 @@ class AuthWithPasswordCredentials(object): def from_json(json_str): try: obj = json.loads(json_str) - if not "auth" in obj: - raise fault.BadRequestFault("Expecting auth") - auth = obj["auth"] - invalid = [key for key in auth if key not in\ - ['tenantId', 'tenantName', 'passwordCredentials']] - if invalid != []: - raise fault.BadRequestFault("Invalid attribute(s): %s" - % invalid) - tenant_id = auth.get('tenantId') - tenant_name = auth.get('tenantName') + auth = AuthBase._validate_auth(obj, 'tenantId', 'tenantName', + 'passwordCredentials') + cred = AuthBase._validate_key(auth, 'passwordCredentials', + 'username', 'password') - if not "passwordCredentials" in auth: - raise fault.BadRequestFault("Expecting passwordCredentials") - cred = auth["passwordCredentials"] - # Check that fields are valid - invalid = [key for key in cred if key not in\ - ['username', 'password']] - if invalid != []: - raise fault.BadRequestFault("Invalid attribute(s): %s" - % invalid) - if not "username" in cred: - raise fault.BadRequestFault("Expecting a username") - username = cred["username"] - if not "password" in cred: - raise fault.BadRequestFault("Expecting a password") - password = cred["password"] - return AuthWithPasswordCredentials(username, password, tenant_id, - tenant_name) + return AuthWithPasswordCredentials(cred['username'], + cred['password'], + auth.get('tenantId'), + auth.get('tenantName')) except (ValueError, TypeError) as e: raise fault.BadRequestFault("Cannot parse auth", str(e)) diff --git a/keystone/test/functional/test_auth.py b/keystone/test/functional/test_auth.py index 50cda9b3fb..fd7f9d5b59 100644 --- a/keystone/test/functional/test_auth.py +++ b/keystone/test/functional/test_auth.py @@ -293,11 +293,10 @@ class TestServiceAuthentication(common.FunctionalTestCase): 'password': self.user['password']}}}) def test_user_auth_with_no_name(self): - """Authenticating without a username returns a 401""" + """Authenticating without a username returns a 400""" # Authenticate as user to get a token - self.post_token(assert_status=401, as_json={ + self.post_token(assert_status=400, as_json={ 'auth': {'passwordCredentials': { - 'username': None, 'password': self.user['password']}}}) def test_user_auth_with_wrong_password(self): @@ -309,9 +308,9 @@ class TestServiceAuthentication(common.FunctionalTestCase): 'password': 'this-is-completely-wrong'}}}) def test_user_auth_with_no_password(self): - """Authenticating with an invalid password returns a 401""" + """Authenticating with an invalid password returns a 400""" # Authenticate as user to get a token - self.post_token(assert_status=401, as_json={ + self.post_token(assert_status=400, as_json={ 'auth': {'passwordCredentials': { 'username': self.user['name'], 'password': None}}}) diff --git a/keystone/test/unit/test_auth.py b/keystone/test/unit/test_auth.py index 4242ce8c4f..0c70f3eddb 100755 --- a/keystone/test/unit/test_auth.py +++ b/keystone/test/unit/test_auth.py @@ -1,5 +1,7 @@ +import json import unittest2 as unittest import keystone.logic.types.auth as auth +import keystone.logic.types.fault as fault class TestAuth(unittest.TestCase): @@ -14,9 +16,172 @@ class TestAuth(unittest.TestCase): def test_pwd_cred_marshall(self): creds = auth.AuthWithPasswordCredentials.from_xml(self.pwd_xml) - self.assertTrue(creds.password, "secret") - self.assertTrue(creds.username, "username") + self.assertEqual(creds.password, "secret") + self.assertEqual(creds.username, "disabled") + def test_pwd_creds_from_json(self): + data = json.dumps({"auth": + {"passwordCredentials": + {"username": "foo", "password": "bar"}}}) + creds = auth.AuthWithPasswordCredentials.from_json(data) + self.assertEqual(creds.username, "foo") + self.assertEqual(creds.password, "bar") + self.assertIsNone(creds.tenant_id) + self.assertIsNone(creds.tenant_name) + + def test_pwd_creds_with_tenant_from_json(self): + data = json.dumps({"auth": + {"tenantName": "blaa", + "passwordCredentials": + {"username": "foo", "password": "bar"}}}) + creds = auth.AuthWithPasswordCredentials.from_json(data) + self.assertEqual(creds.username, "foo") + self.assertEqual(creds.password, "bar") + self.assertIsNone(creds.tenant_id) + self.assertEqual(creds.tenant_name, "blaa") + + def test_pwd_creds_with_tenant_from_json(self): + data = json.dumps({"auth": + {"tenantId": "blaa", + "passwordCredentials": + {"username": "foo", "password": "bar"}}}) + creds = auth.AuthWithPasswordCredentials.from_json(data) + self.assertEqual(creds.username, "foo") + self.assertEqual(creds.password, "bar") + self.assertEqual(creds.tenant_id, "blaa") + self.assertIsNone(creds.tenant_name) + + def test_pwd_not_both_tenant_from_json(self): + data = json.dumps({"auth": {"tenantId": "blaa", "tenantName": "aalb"}}) + self.assertRaisesRegexp(fault.BadRequestFault, + "not both", + auth.AuthWithPasswordCredentials.from_json, + data) + + def test_pwd_invalid_from_json(self): + self.assertRaisesRegexp(fault.BadRequestFault, + "Cannot parse", + auth.AuthWithPasswordCredentials.from_json, + "") + + def test_pwd_no_auth_from_json(self): + data = json.dumps({"foo": "bar"}) + self.assertRaisesRegexp(fault.BadRequestFault, + "Expecting auth", + auth.AuthWithPasswordCredentials.from_json, + data) + + def test_pwd_no_creds_from_json(self): + data = json.dumps({"auth": {}}) + self.assertRaisesRegexp(fault.BadRequestFault, + "Expecting passwordCredentials", + auth.AuthWithPasswordCredentials.from_json, + data) + + def test_pwd_invalid_attribute_from_json(self): + data = json.dumps({"auth": {"foo": "bar"}}) + self.assertRaisesRegexp(fault.BadRequestFault, + "Invalid", + auth.AuthWithPasswordCredentials.from_json, + data) + + def test_pwd_no_username_from_json(self): + data = json.dumps({"auth": {"passwordCredentials": {}}}) + self.assertRaisesRegexp(fault.BadRequestFault, + "Expecting passwordCredentials:username", + auth.AuthWithPasswordCredentials.from_json, + data) + + def test_pwd_no_password_from_json(self): + data = json.dumps({"auth": {"passwordCredentials": + {"username": "foo"}}}) + self.assertRaisesRegexp(fault.BadRequestFault, + "Expecting passwordCredentials:password", + auth.AuthWithPasswordCredentials.from_json, + data) + + def test_pwd_invalid_creds_attribute_from_json(self): + data = json.dumps({"auth": {"passwordCredentials": {"bar": "foo"}}}) + self.assertRaisesRegexp(fault.BadRequestFault, + "Invalid", + auth.AuthWithPasswordCredentials.from_json, + data) + + def test_token_creds_from_json(self): + data = json.dumps({"auth": {"token": {"id": "1"}}}) + creds = auth.AuthWithUnscopedToken.from_json(data) + self.assertEqual(creds.token_id, "1") + self.assertIsNone(creds.tenant_id) + self.assertIsNone(creds.tenant_name) + + def test_token_creds_with_tenant_name_from_json(self): + data = json.dumps({"auth": + {"tenantName": "blaa", + "token": {"id": "1"}}}) + creds = auth.AuthWithUnscopedToken.from_json(data) + self.assertEqual(creds.token_id, "1") + self.assertIsNone(creds.tenant_id) + self.assertEqual(creds.tenant_name, "blaa") + + def test_token_creds_with_tenant_id_from_json(self): + data = json.dumps({"auth": + {"tenantId": "blaa", + "token": {"id": "1"}}}) + creds = auth.AuthWithUnscopedToken.from_json(data) + self.assertEqual(creds.token_id, "1") + self.assertEqual(creds.tenant_id, "blaa") + self.assertIsNone(creds.tenant_name) + + def test_token_not_both_tenant_from_json(self): + data = json.dumps({"auth": + {"tenantId": "blaa", + "tenantName": "aalb", + "token": {"id": "1"}}}) + self.assertRaisesRegexp(fault.BadRequestFault, + "not both", + auth.AuthWithUnscopedToken.from_json, + data) + + def test_token_invalid_from_json(self): + self.assertRaisesRegexp(fault.BadRequestFault, + "Cannot parse", + auth.AuthWithUnscopedToken.from_json, + "") + + def test_token_no_auth_from_json(self): + data = json.dumps({"foo": "bar"}) + self.assertRaisesRegexp(fault.BadRequestFault, + "Expecting auth", + auth.AuthWithUnscopedToken.from_json, + data) + + def test_token_no_creds_from_json(self): + data = json.dumps({"auth": {}}) + self.assertRaisesRegexp(fault.BadRequestFault, + "Expecting token", + auth.AuthWithUnscopedToken.from_json, + data) + + def test_token_invalid_attribute_from_json(self): + data = json.dumps({"auth": {"foo": "bar"}}) + self.assertRaisesRegexp(fault.BadRequestFault, + "Invalid", + auth.AuthWithUnscopedToken.from_json, + data) + + def test_token_no_id_from_json(self): + data = json.dumps({"auth": {"token": {}}}) + self.assertRaisesRegexp(fault.BadRequestFault, + "Expecting token:id", + auth.AuthWithUnscopedToken.from_json, + data) + + def test_token_invalid_attribute_from_json(self): + data = json.dumps({"auth": {"token": {"bar": "foo"}}}) + self.assertRaisesRegexp(fault.BadRequestFault, + "Invalid", + auth.AuthWithUnscopedToken.from_json, + data) if __name__ == '__main__': unittest.main()