Move auth_token middleware from admin user to an RBAC policy
Before this patch auth_token middleware required admin user credentials stored in assorted config files. With this patch only non-admin user credentials are needed. The revocation_list and validate_token commands use an policy.json rule, to only allow these commands if you are in have the service role. Rule used: "service_role": [["role:service"]], "service_or_admin": [["rule:admin_required"], ["rule:service_role"]], Added the policy wrapper on the validate functions. Fixes bug 1153789 Change-Id: I43986e26b16aa5213ad2536a0d07d942bf3dbbbb
This commit is contained in:
parent
96a816f50d
commit
3c3f5dc897
|
@ -1,5 +1,7 @@
|
|||
{
|
||||
"admin_required": [["role:admin"], ["is_admin:1"]],
|
||||
"service_role": [["role:service"]],
|
||||
"service_or_admin": [["rule:admin_required"], ["rule:service_role"]],
|
||||
"owner" : [["user_id:%(user_id)s"]],
|
||||
"admin_or_owner": [["rule:admin_required"], ["rule:owner"]],
|
||||
|
||||
|
@ -71,8 +73,9 @@
|
|||
"identity:delete_policy": [["rule:admin_required"]],
|
||||
|
||||
"identity:check_token": [["rule:admin_required"]],
|
||||
"identity:validate_token": [["rule:admin_required"]],
|
||||
"identity:revocation_list": [["rule:admin_required"]],
|
||||
"identity:validate_token": [["rule:service_or_admin"]],
|
||||
"identity:validate_token_head": [["rule:service_or_admin"]],
|
||||
"identity:revocation_list": [["rule:service_or_admin"]],
|
||||
"identity:revoke_token": [["rule:admin_required"],
|
||||
["user_id:%(user_id)s"]],
|
||||
|
||||
|
|
|
@ -88,7 +88,7 @@ def flatten(d, parent_key=''):
|
|||
def protected(f):
|
||||
"""Wraps API calls with role based access controls (RBAC)."""
|
||||
@functools.wraps(f)
|
||||
def wrapper(self, context, **kwargs):
|
||||
def wrapper(self, context, *args, **kwargs):
|
||||
if 'is_admin' in context and context['is_admin']:
|
||||
LOG.warning(_('RBAC: Bypassing authorization'))
|
||||
else:
|
||||
|
@ -101,7 +101,7 @@ def protected(f):
|
|||
self.policy_api.enforce(context, creds, action, flatten(kwargs))
|
||||
LOG.debug(_('RBAC: Authorization granted'))
|
||||
|
||||
return f(self, context, **kwargs)
|
||||
return f(self, context, *args, **kwargs)
|
||||
return wrapper
|
||||
|
||||
|
||||
|
|
|
@ -456,8 +456,6 @@ class Auth(controller.V2Controller):
|
|||
Optionally, limited to a token owned by a specific tenant.
|
||||
|
||||
"""
|
||||
# TODO(termie): this stuff should probably be moved to middleware
|
||||
self.assert_admin(context)
|
||||
data = self.token_api.get_token(context=context,
|
||||
token_id=token_id)
|
||||
if belongs_to:
|
||||
|
@ -509,7 +507,7 @@ class Auth(controller.V2Controller):
|
|||
if project_ref['domain_id'] != DEFAULT_DOMAIN_ID:
|
||||
raise exception.Unauthorized(msg)
|
||||
|
||||
# admin only
|
||||
@controller.protected
|
||||
def validate_token_head(self, context, token_id):
|
||||
"""Check that a token is valid.
|
||||
|
||||
|
@ -523,7 +521,7 @@ class Auth(controller.V2Controller):
|
|||
assert token_ref
|
||||
self._assert_default_domain(context, token_ref)
|
||||
|
||||
# admin only
|
||||
@controller.protected
|
||||
def validate_token(self, context, token_id):
|
||||
"""Check that a token is valid.
|
||||
|
||||
|
@ -561,8 +559,8 @@ class Auth(controller.V2Controller):
|
|||
self.assert_admin(context)
|
||||
self.token_api.delete_token(context=context, token_id=token_id)
|
||||
|
||||
@controller.protected
|
||||
def revocation_list(self, context, auth=None):
|
||||
self.assert_admin(context)
|
||||
tokens = self.token_api.list_revoked_tokens(context)
|
||||
|
||||
for t in tokens:
|
||||
|
|
|
@ -45,6 +45,12 @@ TENANTS = [
|
|||
'description': 'description',
|
||||
'enabled': True,
|
||||
'domain_id': DEFAULT_DOMAIN_ID
|
||||
}, {
|
||||
'id': 'service',
|
||||
'name': 'service',
|
||||
'description': 'description',
|
||||
'enabled': True,
|
||||
'domain_id': DEFAULT_DOMAIN_ID
|
||||
}
|
||||
]
|
||||
|
||||
|
@ -115,8 +121,12 @@ ROLES = [
|
|||
}, {
|
||||
'id': 'writer',
|
||||
'name': 'Writer',
|
||||
}, {
|
||||
'id': 'service',
|
||||
'name': 'Service',
|
||||
}
|
||||
|
||||
|
||||
]
|
||||
|
||||
DOMAINS = [
|
||||
|
|
|
@ -1598,7 +1598,7 @@ class IdentityTests(object):
|
|||
|
||||
def test_list_projects(self):
|
||||
projects = self.identity_api.list_projects()
|
||||
self.assertEquals(len(projects), 3)
|
||||
self.assertEquals(len(projects), 4)
|
||||
project_ids = []
|
||||
for project in projects:
|
||||
project_ids.append(project.get('id'))
|
||||
|
|
|
@ -232,8 +232,10 @@ class RestfulTestCase(test.TestCase):
|
|||
self.assertValidResponseHeaders(response)
|
||||
return response
|
||||
|
||||
def get_scoped_token(self):
|
||||
def get_scoped_token(self, tenant_id=None):
|
||||
"""Convenience method so that we can test authenticated requests."""
|
||||
if not tenant_id:
|
||||
tenant_id = self.tenant_bar['id']
|
||||
r = self.public_request(
|
||||
method='POST',
|
||||
path='/v2.0/tokens',
|
||||
|
@ -243,7 +245,7 @@ class RestfulTestCase(test.TestCase):
|
|||
'username': self.user_foo['name'],
|
||||
'password': self.user_foo['password'],
|
||||
},
|
||||
'tenantId': self.tenant_bar['id'],
|
||||
'tenantId': tenant_id,
|
||||
},
|
||||
})
|
||||
return self._get_token_id(r)
|
||||
|
@ -405,6 +407,18 @@ class CoreApiTests(object):
|
|||
token=token)
|
||||
self.assertValidAuthenticationResponse(r)
|
||||
|
||||
def test_validate_token_service_role(self):
|
||||
self.metadata_foobar = self.identity_api.update_metadata(
|
||||
self.user_foo['id'],
|
||||
self.tenant_service['id'],
|
||||
dict(roles=[self.role_service['id']]))
|
||||
|
||||
token = self.get_scoped_token(tenant_id='service')
|
||||
r = self.admin_request(
|
||||
path='/v2.0/tokens/%s' % token,
|
||||
token=token)
|
||||
self.assertValidAuthenticationResponse(r)
|
||||
|
||||
def test_validate_token_belongs_to(self):
|
||||
token = self.get_scoped_token()
|
||||
path = ('/v2.0/tokens/%s?belongsTo=%s' % (token,
|
||||
|
|
Loading…
Reference in New Issue