@@ -121,3 +121,42 @@ class KeystoneBackend(object):
|
|||||||
|
|
||||||
LOG.debug('Authentication completed for user "%s".' % username)
|
LOG.debug('Authentication completed for user "%s".' % username)
|
||||||
return user
|
return user
|
||||||
|
|
||||||
|
def get_group_permissions(self, user, obj=None):
|
||||||
|
""" Returns an empty set since Keystone doesn't support "groups". """
|
||||||
|
return set()
|
||||||
|
|
||||||
|
def get_all_permissions(self, user, obj=None):
|
||||||
|
"""
|
||||||
|
Returns a set of permission strings that this user has through his/her
|
||||||
|
Keystone "roles".
|
||||||
|
|
||||||
|
The permissions are returned as ``"openstack.{{ role.name }}"``.
|
||||||
|
"""
|
||||||
|
if user.is_anonymous() or obj is not None:
|
||||||
|
return set()
|
||||||
|
# TODO: Integrate policy-driven RBAC when supported by Keystone.
|
||||||
|
role_perms = set(["openstack.roles.%s" % role['name'].lower()
|
||||||
|
for role in user.roles])
|
||||||
|
service_perms = set(["openstack.services.%s" % service['type'].lower()
|
||||||
|
for service in user.service_catalog])
|
||||||
|
return role_perms | service_perms
|
||||||
|
|
||||||
|
def has_perm(self, user, perm, obj=None):
|
||||||
|
""" Returns True if the given user has the specified permission. """
|
||||||
|
if not user.is_active:
|
||||||
|
return False
|
||||||
|
return perm in self.get_all_permissions(user, obj)
|
||||||
|
|
||||||
|
def has_module_perms(self, user, app_label):
|
||||||
|
"""
|
||||||
|
Returns True if user has any permissions in the given app_label.
|
||||||
|
|
||||||
|
Currently this matches for the app_label ``"openstack"``.
|
||||||
|
"""
|
||||||
|
if not user.is_active:
|
||||||
|
return False
|
||||||
|
for perm in self.get_all_permissions(user):
|
||||||
|
if perm[:perm.index('.')] == app_label:
|
||||||
|
return True
|
||||||
|
return False
|
||||||
|
|||||||
@@ -89,6 +89,12 @@ class User(AnonymousUser):
|
|||||||
""" Checks for a valid token that has not yet expired. """
|
""" Checks for a valid token that has not yet expired. """
|
||||||
return self.token is not None and check_token_expiration(self.token)
|
return self.token is not None and check_token_expiration(self.token)
|
||||||
|
|
||||||
|
def is_anonymous(self):
|
||||||
|
"""
|
||||||
|
Returns ``True`` if the user is not authenticated,``False`` otherwise.
|
||||||
|
"""
|
||||||
|
return not self.is_authenticated()
|
||||||
|
|
||||||
@property
|
@property
|
||||||
def is_active(self):
|
def is_active(self):
|
||||||
return self.enabled
|
return self.enabled
|
||||||
@@ -99,10 +105,7 @@ class User(AnonymousUser):
|
|||||||
Evaluates whether this user has admin privileges. Returns
|
Evaluates whether this user has admin privileges. Returns
|
||||||
``True`` or ``False``.
|
``True`` or ``False``.
|
||||||
"""
|
"""
|
||||||
for role in self.roles:
|
return 'admin' in [role['name'].lower() for role in self.roles]
|
||||||
if role['name'].lower() == 'admin':
|
|
||||||
return True
|
|
||||||
return False
|
|
||||||
|
|
||||||
@property
|
@property
|
||||||
def authorized_tenants(self):
|
def authorized_tenants(self):
|
||||||
|
|||||||
Reference in New Issue
Block a user