Improve swift's keystoneauth ACL support

Below three bug reports talk about one thing.
Current keystoneauth ACL supports as:

tenant_name:user_id         ok
tenant_name:user_name       no
tenant_name:*               no
tenant_id:user_id           ok
tenant_id:user_name         no
tenant_id:*                 no
*:user_id                   ok
*:user_name                 no
*:*                         no

This patch will make all of above work fine.
Applying (tenant/user)name could let user put or get their data in a
more readable way. The tenant_name:* and *:user_name is suitable for
many usage.

note: to keep compatibility here add a new keystone.identity just for
authorize() itself and leave env['keystone.identity'] to other
middlerwares.

Fixes: bug #1020709
Fixes: bug #1075362
Fixes: bug #1155389
Change-Id: I9354dedaad875117f6a9072c67e9ecf69bfca77e
This commit is contained in:
Kun Huang 2013-03-26 13:20:09 +08:00
parent 5d52d2d1cc
commit 58a095b93e
2 changed files with 97 additions and 46 deletions

View File

@ -1,5 +1,4 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright 2012 OpenStack LLC
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
@ -119,6 +118,10 @@ class KeystoneAuth(object):
def _keystone_identity(self, environ):
"""Extract the identity from the Keystone auth component."""
# In next release, we would add user id in env['keystone.identity'] by
# using _integral_keystone_identity to replace current
# _keystone_identity. The purpose of keeping it in this release it for
# back compatibility.
if environ.get('HTTP_X_IDENTITY_STATUS') != 'Confirmed':
return
roles = []
@ -130,6 +133,20 @@ class KeystoneAuth(object):
'roles': roles}
return identity
def _integral_keystone_identity(self, environ):
"""Extract the identity from the Keystone auth component."""
if environ.get('HTTP_X_IDENTITY_STATUS') != 'Confirmed':
return
roles = []
if 'HTTP_X_ROLES' in environ:
roles = environ['HTTP_X_ROLES'].split(',')
identity = {'user': (environ.get('HTTP_X_USER_ID'),
environ.get('HTTP_X_USER_NAME')),
'tenant': (environ.get('HTTP_X_TENANT_ID'),
environ.get('HTTP_X_TENANT_NAME')),
'roles': roles}
return identity
def _get_account_for_tenant(self, tenant_id):
return '%s%s' % (self.reseller_prefix, tenant_id)
@ -137,33 +154,35 @@ class KeystoneAuth(object):
"""Check reseller prefix."""
return account == self._get_account_for_tenant(tenant_id)
def _authorize_cross_tenant(self, user, tenant_id, tenant_name, roles):
def _authorize_cross_tenant(self, user_id, user_name,
tenant_id, tenant_name, roles):
""" Check cross-tenant ACLs
Match tenant_id:user, tenant_name:user, and *:user.
Match tenant:user, tenant and user could be its id, name or '*'
:param user: The user name from the identity token.
:param user_id: The user id from the identity token.
:param user_name: The user name from the identity token.
:param tenant_id: The tenant ID from the identity token.
:param tenant_name: The tenant name from the identity token.
:param roles: The given container ACL.
:returns: True if tenant_id:user, tenant_name:user, or *:user matches
the given ACL. False otherwise.
:returns: matched string if tenant(name/id/*):user(name/id/*) matches
the given ACL.
None otherwise.
"""
wildcard_tenant_match = '*:%s' % (user)
tenant_id_user_match = '%s:%s' % (tenant_id, user)
tenant_name_user_match = '%s:%s' % (tenant_name, user)
return (wildcard_tenant_match in roles
or tenant_id_user_match in roles
or tenant_name_user_match in roles)
for tenant in [tenant_id, tenant_name, '*']:
for user in [user_id, user_name, '*']:
s = '%s:%s' % (tenant, user)
if s in roles:
return s
return None
def authorize(self, req):
env = req.environ
env_identity = env.get('keystone.identity', {})
tenant_id, tenant_name = env_identity.get('tenant')
user = env_identity.get('user', '')
env_identity = self._integral_keystone_identity(env)
tenant_id, tenant_name = env_identity['tenant']
user_id, user_name = env_identity['user']
referrers, roles = swift_acl.parse_acl(getattr(req, 'acl', None))
#allow OPTIONS requests to proceed as normal
@ -187,10 +206,12 @@ class KeystoneAuth(object):
return
# cross-tenant authorization
if self._authorize_cross_tenant(user, tenant_id, tenant_name, roles):
log_msg = 'user %s:%s, %s:%s, or *:%s allowed in ACL authorizing'
self.logger.debug(log_msg % (tenant_name, user,
tenant_id, user, user))
matched_acl = self._authorize_cross_tenant(user_id, user_name,
tenant_id, tenant_name,
roles)
if matched_acl is not None:
log_msg = 'user %s allowed in ACL authorizing.' % matched_acl
self.logger.debug(log_msg)
return
acl_authorized = self._authorize_unconfirmed_identity(req, obj,
@ -219,7 +240,7 @@ class KeystoneAuth(object):
return
# If user is of the same name of the tenant then make owner of it.
if self.is_admin and user == tenant_name:
if self.is_admin and user_name == tenant_name:
self.logger.warning("the is_admin feature has been deprecated "
"and will be removed in the future "
"update your config file")
@ -233,7 +254,8 @@ class KeystoneAuth(object):
for user_role in user_roles:
if user_role in (r.lower() for r in roles):
log_msg = 'user %s:%s allowed in ACL: %s authorizing'
self.logger.debug(log_msg % (tenant_name, user, user_role))
self.logger.debug(log_msg % (tenant_name, user_name,
user_role))
return
return self.denied_response(req)

View File

@ -164,13 +164,18 @@ class TestAuthorize(unittest.TestCase):
def _get_account(self, identity=None):
if not identity:
identity = self._get_identity()
return self.test_auth._get_account_for_tenant(identity['tenant'][0])
return self.test_auth._get_account_for_tenant(identity['HTTP_X_TENANT_ID'])
def _get_identity(self, tenant_id='tenant_id',
tenant_name='tenant_name', user='user', roles=None):
if not roles:
roles = []
return dict(tenant=(tenant_id, tenant_name), user=user, roles=roles)
def _get_identity(self, tenant_id='tenant_id', tenant_name='tenant_name',
user_id='user_id', user_name='user_name', roles=[]):
if isinstance(roles, list):
roles = ','.join(roles)
return {'HTTP_X_USER_ID': user_id,
'HTTP_X_USER_NAME': user_name,
'HTTP_X_TENANT_ID': tenant_id,
'HTTP_X_TENANT_NAME': tenant_name,
'HTTP_X_ROLES': roles,
'HTTP_X_IDENTITY_STATUS': 'Confirmed'}
def _check_authenticate(self, account=None, identity=None, headers=None,
exception=None, acl=None, env=None, path=None):
@ -180,8 +185,8 @@ class TestAuthorize(unittest.TestCase):
account = self._get_account(identity)
if not path:
path = '/v1/%s/c' % account
default_env = {'keystone.identity': identity,
'REMOTE_USER': identity['tenant']}
default_env = {'REMOTE_USER': identity['HTTP_X_TENANT_ID']}
default_env.update(identity)
if env:
default_env.update(env)
req = self._make_request(path, headers=headers, environ=default_env)
@ -225,8 +230,7 @@ class TestAuthorize(unittest.TestCase):
self.assertTrue(req.environ.get('swift_owner'))
def _check_authorize_for_tenant_owner_match(self, exception=None):
identity = self._get_identity()
identity['user'] = identity['tenant'][1]
identity = self._get_identity(user_name='same_name', tenant_name='same_name')
req = self._check_authenticate(identity=identity, exception=exception)
expected = bool(exception is None)
self.assertEqual(bool(req.environ.get('swift_owner')), expected)
@ -271,30 +275,55 @@ class TestAuthorize(unittest.TestCase):
def test_authorize_succeeds_for_tenant_name_user_in_roles(self):
identity = self._get_identity()
acl = '%s:%s' % (identity['tenant'][1], identity['user'])
self._check_authenticate(identity=identity, acl=acl)
user_name = identity['HTTP_X_USER_NAME']
user_id = identity['HTTP_X_USER_ID']
tenant_id = identity['HTTP_X_TENANT_ID']
for user in [user_id, user_name, '*']:
acl = '%s:%s' % (tenant_id, user)
self._check_authenticate(identity=identity, acl=acl)
def test_authorize_succeeds_for_tenant_id_user_in_roles(self):
identity = self._get_identity()
acl = '%s:%s' % (identity['tenant'][0], identity['user'])
self._check_authenticate(identity=identity, acl=acl)
user_name = identity['HTTP_X_USER_NAME']
user_id = identity['HTTP_X_USER_ID']
tenant_name = identity['HTTP_X_TENANT_NAME']
for user in [user_id, user_name, '*']:
acl = '%s:%s' % (tenant_name, user)
self._check_authenticate(identity=identity, acl=acl)
def test_authorize_succeeds_for_wildcard_tenant_user_in_roles(self):
identity = self._get_identity()
acl = '*:%s' % (identity['user'])
self._check_authenticate(identity=identity, acl=acl)
user_name = identity['HTTP_X_USER_NAME']
user_id = identity['HTTP_X_USER_ID']
for user in [user_id, user_name, '*']:
acl = '*:%s' % user
self._check_authenticate(identity=identity, acl=acl)
def test_cross_tenant_authorization_success(self):
self.assertTrue(self.test_auth._authorize_cross_tenant('userA',
'tenantID', 'tenantNAME', ['tenantID:userA']))
self.assertTrue(self.test_auth._authorize_cross_tenant('userA',
'tenantID', 'tenantNAME', ['tenantNAME:userA']))
self.assertTrue(self.test_auth._authorize_cross_tenant('userA',
'tenantID', 'tenantNAME', ['*:userA']))
self.assertEqual(self.test_auth._authorize_cross_tenant('userID',
'userA', 'tenantID', 'tenantNAME', ['tenantID:userA']), 'tenantID:userA')
self.assertEqual(self.test_auth._authorize_cross_tenant('userID',
'userA', 'tenantID', 'tenantNAME', ['tenantNAME:userA']), 'tenantNAME:userA')
self.assertEqual(self.test_auth._authorize_cross_tenant('userID',
'userA', 'tenantID', 'tenantNAME', ['*:userA']), '*:userA')
self.assertEqual(self.test_auth._authorize_cross_tenant('userID',
'userA', 'tenantID', 'tenantNAME', ['tenantID:userID']), 'tenantID:userID')
self.assertEqual(self.test_auth._authorize_cross_tenant('userID',
'userA', 'tenantID', 'tenantNAME', ['tenantNAME:userID']), 'tenantNAME:userID')
self.assertEqual(self.test_auth._authorize_cross_tenant('userID',
'userA', 'tenantID', 'tenantNAME', ['*:userID']), '*:userID')
self.assertEqual(self.test_auth._authorize_cross_tenant('userID',
'userA', 'tenantID', 'tenantNAME', ['tenantID:*']), 'tenantID:*')
self.assertEqual(self.test_auth._authorize_cross_tenant('userID',
'userA', 'tenantID', 'tenantNAME', ['tenantNAME:*']), 'tenantNAME:*')
self.assertEqual(self.test_auth._authorize_cross_tenant('userID',
'userA', 'tenantID', 'tenantNAME', ['*:*']), '*:*')
def test_cross_tenant_authorization_failure(self):
self.assertFalse(self.test_auth._authorize_cross_tenant('userA',
'tenantID', 'tenantNAME', ['tenantXYZ:userA']))
self.assertEqual(self.test_auth._authorize_cross_tenant('userID',
'userA', 'tenantID', 'tenantNAME', ['tenantXYZ:userA']), None)
if __name__ == '__main__':