bp/cross-tenant-acls: allow tenantId:user, tenantName:user, and *:user ALCs

Change-Id: I7cfe77b3f03172814814f2e2bae04a3ae184efb0
This commit is contained in:
Guang Yee 2012-11-30 11:58:36 -08:00
parent 871f552ab6
commit 795acd40f1
2 changed files with 49 additions and 11 deletions

View File

@ -140,10 +140,34 @@ class KeystoneAuth(object):
"""Check reseller prefix.""" """Check reseller prefix."""
return account == self._get_account_for_tenant(tenant_id) return account == self._get_account_for_tenant(tenant_id)
def _authorize_cross_tenant(self, user, tenant_id, tenant_name, roles):
""" Check cross-tenant ACLs
Match tenant_id:user, tenant_name:user, and *:user.
:param user: The user name from the identity token.
:param tenant_id: The tenant ID from the identity token.
:param tenant_name: The tenant name from the identity token.
:param roles: The given container ACL.
:returns: True if tenant_id:user, tenant_name:user, or *:user matches
the given ACL. False otherwise.
"""
wildcard_tenant_match = '*:%s' % (user)
tenant_id_user_match = '%s:%s' % (tenant_id, user)
tenant_name_user_match = '%s:%s' % (tenant_name, user)
return (wildcard_tenant_match in roles
or tenant_id_user_match in roles
or tenant_name_user_match in roles)
def authorize(self, req): def authorize(self, req):
env = req.environ env = req.environ
env_identity = env.get('keystone.identity', {}) env_identity = env.get('keystone.identity', {})
tenant_id, tenant_name = env_identity.get('tenant') tenant_id, tenant_name = env_identity.get('tenant')
user = env_identity.get('user', '')
referrers, roles = swift_acl.parse_acl(getattr(req, 'acl', None))
try: try:
part = swift_utils.split_path(req.path, 1, 4, True) part = swift_utils.split_path(req.path, 1, 4, True)
@ -161,6 +185,13 @@ class KeystoneAuth(object):
req.environ['swift_owner'] = True req.environ['swift_owner'] = True
return return
# cross-tenant authorization
if self._authorize_cross_tenant(user, tenant_id, tenant_name, roles):
log_msg = 'user %s:%s, %s:%s, or *:%s allowed in ACL authorizing'
self.logger.debug(log_msg % (tenant_name, user,
tenant_id, user, user))
return
# Check if a user tries to access an account that does not match their # Check if a user tries to access an account that does not match their
# token # token
if not self._reseller_check(account, tenant_id): if not self._reseller_check(account, tenant_id):
@ -181,13 +212,10 @@ class KeystoneAuth(object):
return return
# If user is of the same name of the tenant then make owner of it. # If user is of the same name of the tenant then make owner of it.
user = env_identity.get('user', '')
if self.is_admin and user == tenant_name: if self.is_admin and user == tenant_name:
req.environ['swift_owner'] = True req.environ['swift_owner'] = True
return return
referrers, roles = swift_acl.parse_acl(getattr(req, 'acl', None))
authorized = self._authorize_unconfirmed_identity(req, obj, referrers, authorized = self._authorize_unconfirmed_identity(req, obj, referrers,
roles) roles)
if authorized: if authorized:
@ -195,14 +223,6 @@ class KeystoneAuth(object):
elif authorized is not None: elif authorized is not None:
return self.denied_response(req) return self.denied_response(req)
# Allow ACL at individual user level (tenant:user format)
# For backward compatibility, check for ACL in tenant_id:user format
if ('%s:%s' % (tenant_name, user) in roles
or '%s:%s' % (tenant_id, user) in roles):
log_msg = 'user %s:%s or %s:%s allowed in ACL authorizing'
self.logger.debug(log_msg % (tenant_name, user, tenant_id, user))
return
# Check if we have the role in the userroles and allow it # Check if we have the role in the userroles and allow it
for user_role in user_roles: for user_role in user_roles:
if user_role in roles: if user_role in roles:

View File

@ -227,5 +227,23 @@ class TestAuthorize(unittest.TestCase):
acl = '%s:%s' % (identity['tenant'][0], identity['user']) acl = '%s:%s' % (identity['tenant'][0], identity['user'])
self._check_authenticate(identity=identity, acl=acl) self._check_authenticate(identity=identity, acl=acl)
def test_authorize_succeeds_for_wildcard_tenant_user_in_roles(self):
identity = self._get_identity()
acl = '*:%s' % (identity['user'])
self._check_authenticate(identity=identity, acl=acl)
def test_cross_tenant_authorization_success(self):
self.assertTrue(self.test_auth._authorize_cross_tenant('userA',
'tenantID', 'tenantNAME', ['tenantID:userA']))
self.assertTrue(self.test_auth._authorize_cross_tenant('userA',
'tenantID', 'tenantNAME', ['tenantNAME:userA']))
self.assertTrue(self.test_auth._authorize_cross_tenant('userA',
'tenantID', 'tenantNAME', ['*:userA']))
def test_cross_tenant_authorization_failure(self):
self.assertFalse(self.test_auth._authorize_cross_tenant('userA',
'tenantID', 'tenantNAME', ['tenantXYZ:userA']))
if __name__ == '__main__': if __name__ == '__main__':
unittest.main() unittest.main()