Add auth checks to ec2 credential crud operations

* Re-enables ec2 crud authorization tests
* Fixes bug 928471

Change-Id: I22a97a8659ade5d146b52d112ff66ea58f847ef7
This commit is contained in:
Brian Waldon 2012-02-08 10:46:33 -08:00
parent 3364905041
commit a3d21f06ad
3 changed files with 73 additions and 27 deletions

View File

@ -194,8 +194,8 @@ class Ec2Controller(wsgi.Application):
:param tenant_id: id of tenant
:returns: credential: dict of ec2 credential
"""
# TODO(termie): validate that this request is valid for given user
# tenant
if not self._is_admin(context):
self._assert_identity(context, user_id)
cred_ref = {'user_id': user_id,
'tenant_id': tenant_id,
'access': uuid.uuid4().hex,
@ -210,9 +210,8 @@ class Ec2Controller(wsgi.Application):
:param user_id: id of user
:returns: credentials: list of ec2 credential dicts
"""
# TODO(termie): validate that this request is valid for given user
# tenant
if not self._is_admin(context):
self._assert_identity(context, user_id)
return {'credentials': self.ec2_api.list_credentials(context, user_id)}
def get_credential(self, context, user_id, credential_id):
@ -225,8 +224,8 @@ class Ec2Controller(wsgi.Application):
:param credential_id: access key for credentials
:returns: credential: dict of ec2 credential
"""
# TODO(termie): validate that this request is valid for given user
# tenant
if not self._is_admin(context):
self._assert_identity(context, user_id)
return {'credential': self.ec2_api.get_credential(context,
credential_id)}
@ -240,6 +239,47 @@ class Ec2Controller(wsgi.Application):
:param credential_id: access key for credentials
:returns: bool: success
"""
# TODO(termie): validate that this request is valid for given user
# tenant
if not self._is_admin(context):
self._assert_identity(context, user_id)
self._assert_owner(context, user_id, credential_id)
return self.ec2_api.delete_credential(context, credential_id)
def _assert_identity(self, context, user_id):
"""Check that the provided token belongs to the user.
:param context: standard context
:param user_id: id of user
:raises webob.exc.HTTPForbidden: when token is invalid
"""
token_ref = self.token_api.get_token(context=context,
token_id=context['token_id'])
token_user_id = token_ref['user'].get('id')
if not token_user_id == user_id:
raise webob.exc.HTTPForbidden()
def _is_admin(self, context):
"""Wrap admin assertion error return statement.
:param context: standard context
:returns: bool: success
"""
try:
self.assert_admin(context)
return True
except AssertionError:
return False
def _assert_owner(self, context, user_id, credential_id):
"""Ensure the provided user owns the credential.
:param context: standard context
:param user_id: expected credential owner
:param credential_id: id of credential object
:raises webob.exc.HTTPForbidden: on failure
"""
cred_ref = self.ec2_api.get_credential(context, credential_id)
if not user_id == cred_ref['user_id']:
raise webob.exc.HTTPForbidden()

View File

@ -76,3 +76,12 @@ class CliMasterTestCase(test_keystoneclient.KcMasterTestCase):
def test_service_create_and_delete(self):
raise nose.exc.SkipTest('cli testing code does not handle 404 well')
def test_ec2_credentials_list_user_forbidden(self):
raise nose.exc.SkipTest('cli testing code does not handle 403 well')
def test_ec2_credentials_get_user_forbidden(self):
raise nose.exc.SkipTest('cli testing code does not handle 403 well')
def test_ec2_credentials_delete_user_forbidden(self):
raise nose.exc.SkipTest('cli testing code does not handle 403 well')

View File

@ -285,45 +285,42 @@ class KeystoneClientTests(object):
got = client.ec2.get(user_id=self.user_foo['id'], access=cred.access)
self.assertEquals(cred, got)
# FIXME(ja): need to test ec2 validation here
client.ec2.delete(user_id=self.user_foo['id'], access=cred.access)
creds = client.ec2.list(user_id=self.user_foo['id'])
self.assertEquals(creds, [])
def test_ec2_credentials_list_unauthorized_user(self):
raise nose.exc.SkipTest('TODO')
def test_ec2_credentials_list_user_forbidden(self):
from keystoneclient import exceptions as client_exceptions
two = self.get_client(self.user_two)
self.assertRaises(client_exceptions.Unauthorized, two.ec2.list,
self.user_foo['id'])
self.assertRaises(client_exceptions.Forbidden, two.ec2.list,
user_id=self.user_foo['id'])
def test_ec2_credentials_get_unauthorized_user(self):
raise nose.exc.SkipTest('TODO')
def test_ec2_credentials_get_user_forbidden(self):
from keystoneclient import exceptions as client_exceptions
foo = self.get_client()
cred = foo.ec2.create(self.user_foo['id'], self.tenant_bar['id'])
cred = foo.ec2.create(user_id=self.user_foo['id'],
tenant_id=self.tenant_bar['id'])
two = self.get_client(self.user_two)
self.assertRaises(client_exceptions.Unauthorized, two.ec2.get,
self.user_foo['id'], cred.access)
self.assertRaises(client_exceptions.Forbidden, two.ec2.get,
user_id=self.user_foo['id'], access=cred.access)
foo.ec2.delete(self.user_foo['id'], cred.access)
foo.ec2.delete(user_id=self.user_foo['id'], access=cred.access)
def test_ec2_credentials_delete_unauthorized_user(self):
raise nose.exc.SkipTest('TODO')
def test_ec2_credentials_delete_user_forbidden(self):
from keystoneclient import exceptions as client_exceptions
foo = self.get_client()
cred = foo.ec2.create(self.user_foo['id'], self.tenant_bar['id'])
cred = foo.ec2.create(user_id=self.user_foo['id'],
tenant_id=self.tenant_bar['id'])
two = self.get_client(self.user_two)
self.assertRaises(client_exceptions.Unauthorized, two.ec2.delete,
self.user_foo['id'], cred.access)
self.assertRaises(client_exceptions.Forbidden, two.ec2.delete,
user_id=self.user_foo['id'], access=cred.access)
foo.ec2.delete(self.user_foo['id'], cred.access)
foo.ec2.delete(user_id=self.user_foo['id'], access=cred.access)
def test_service_create_and_delete(self):
from keystoneclient import exceptions as client_exceptions