Add default RBAC personas to dynamic credentials

This change adds support to the dynamic credentials provider for nine
specific user personas that can be used to test service policies from
the point of view of the three scopes and three default roles that are
available out of the box in keystone. In addition to the os_admin,
os_primary, os_alt, and role-based credentials that were available
before, test classes can now access such credentials as os_system_admin
or os_domain_reader. An example of how this could be used is proposed
for keystone[1].

A subsequent patch addresses the pre-provisioned credentials provider.

In the future, the original tempest personas may redirect to the new
scope-aware personas in order to maintain compatibility between releases
once projects start enforcing scope. This is not addressed here.

[1] https://review.opendev.org/686305

Change-Id: I8bebb5b9b6d8da62e6a5268d827787da461cc0d6
This commit is contained in:
Colleen Murphy 2019-10-02 14:28:22 -07:00 committed by Ghanshyam
parent 5dbaaed88e
commit 06374e2dfd
4 changed files with 164 additions and 39 deletions

View File

@ -39,11 +39,15 @@ class CredsClient(object):
self.projects_client = projects_client
self.roles_client = roles_client
def create_user(self, username, password, project, email):
def create_user(self, username, password, project=None, email=None):
params = {'name': username,
'password': password,
self.project_id_param: project['id'],
'email': email}
'password': password}
# with keystone v3, a default project is not required
if project:
params[self.project_id_param] = project['id']
# email is not a first-class attribute of a user
if email:
params['email'] = email
user = self.users_client.create_user(**params)
if 'user' in user:
user = user['user']
@ -160,6 +164,15 @@ class V3CredsClient(CredsClient):
def delete_project(self, project_id):
self.projects_client.delete_project(project_id)
def create_domain(self, name, description):
domain = self.domains_client.create_domain(
name=name, description=description)['domain']
return domain
def delete_domain(self, domain_id):
self.domains_client.update_domain(domain_id, enabled=False)
self.domains_client.delete_domain(domain_id)
def get_credentials(
self, user, project, password, domain=None, system=None):
# User, project and domain already include both ID and name here,
@ -215,6 +228,23 @@ class V3CredsClient(CredsClient):
LOG.debug("Role %s already assigned on domain %s for user %s",
role['id'], domain['id'], user['id'])
def assign_user_role_on_system(self, user, role_name):
"""Assign the specified role on the system
:param user: a user dict
:param role_name: name of the role to be assigned
"""
role = self._check_role_exists(role_name)
if not role:
msg = 'No "%s" role found' % role_name
raise lib_exc.NotFound(msg)
try:
self.roles_client.create_user_role_on_system(
user['id'], role['id'])
except lib_exc.Conflict:
LOG.debug("Role %s already assigned on the system for user %s",
role['id'], user['id'])
def get_creds_client(identity_client,
projects_client,

View File

@ -164,62 +164,98 @@ class DynamicCredentialProvider(cred_provider.CredentialProvider):
os.network.PortsClient(),
os.network.SecurityGroupsClient())
def _create_creds(self, admin=False, roles=None):
def _create_creds(self, admin=False, roles=None, scope='project'):
"""Create credentials with random name.
Creates project and user. When admin flag is True create user
with admin role. Assign user with additional roles (for example
_member_) and roles requested by caller.
Creates user and role assignments on a project, domain, or system. When
the admin flag is True, creates user with the admin role on the
resource. If roles are provided, assigns those roles on the resource.
Otherwise, assigns the user the 'member' role on the resource.
:param admin: Flag if to assign to the user admin role
:type admin: bool
:param roles: Roles to assign for the user
:type roles: list
:param str scope: The scope for the role assignment, may be one of
'project', 'domain', or 'system'.
:return: Readonly Credentials with network resources
:raises: Exception if scope is invalid
"""
if not roles:
roles = []
root = self.name
project_name = data_utils.rand_name(root, prefix=self.resource_prefix)
project_desc = project_name + "-desc"
project = self.creds_client.create_project(
name=project_name, description=project_desc)
cred_params = {
'project': None,
'domain': None,
'system': None
}
if scope == 'project':
project_name = data_utils.rand_name(
root, prefix=self.resource_prefix)
project_desc = project_name + '-desc'
project = self.creds_client.create_project(
name=project_name, description=project_desc)
# NOTE(andreaf) User and project can be distinguished from the context,
# having the same ID in both makes it easier to match them and debug.
username = project_name
user_password = data_utils.rand_password()
email = data_utils.rand_name(
root, prefix=self.resource_prefix) + "@example.com"
user = self.creds_client.create_user(
username, user_password, project, email)
role_assigned = False
# NOTE(andreaf) User and project can be distinguished from the
# context, having the same ID in both makes it easier to match them
# and debug.
username = project_name + '-project'
cred_params['project'] = project
elif scope == 'domain':
domain_name = data_utils.rand_name(
root, prefix=self.resource_prefix)
domain_desc = domain_name + '-desc'
domain = self.creds_client.create_domain(
name=domain_name, description=domain_desc)
username = domain_name + '-domain'
cred_params['domain'] = domain
elif scope == 'system':
prefix = data_utils.rand_name(root, prefix=self.resource_prefix)
username = prefix + '-system'
cred_params['system'] = 'all'
else:
raise lib_exc.InvalidScopeType(scope=scope)
if admin:
self.creds_client.assign_user_role(user, project, self.admin_role)
role_assigned = True
username += '-admin'
elif roles and len(roles) == 1:
username += '-' + roles[0]
user_password = data_utils.rand_password()
cred_params['password'] = user_password
user = self.creds_client.create_user(
username, user_password)
cred_params['user'] = user
roles_to_assign = [r for r in roles]
if admin:
roles_to_assign.append(self.admin_role)
self.creds_client.assign_user_role(
user, project, self.identity_admin_role)
if (self.identity_version == 'v3' and
self.identity_admin_domain_scope):
self.creds_client.assign_user_role_on_domain(
user, self.identity_admin_role)
# Add roles specified in config file
for conf_role in self.extra_roles:
self.creds_client.assign_user_role(user, project, conf_role)
role_assigned = True
# Add roles requested by caller
if roles:
for role in roles:
self.creds_client.assign_user_role(user, project, role)
role_assigned = True
roles_to_assign.extend(self.extra_roles)
# If there are still no roles, default to 'member'
# NOTE(mtreinish) For a user to have access to a project with v3 auth
# it must beassigned a role on the project. So we need to ensure that
# our newly created user has a role on the newly created project.
if self.identity_version == 'v3' and not role_assigned:
if not roles_to_assign and self.identity_version == 'v3':
roles_to_assign = ['member']
try:
self.creds_client.create_user_role('member')
except lib_exc.Conflict:
LOG.warning('member role already exists, ignoring conflict.')
self.creds_client.assign_user_role(user, project, 'member')
for role in roles_to_assign:
if scope == 'project':
self.creds_client.assign_user_role(user, project, role)
elif scope == 'domain':
self.creds_client.assign_user_role_on_domain(
user, role, domain)
elif scope == 'system':
self.creds_client.assign_user_role_on_system(user, role)
creds = self.creds_client.get_credentials(user, project, user_password)
creds = self.creds_client.get_credentials(**cred_params)
return cred_provider.TestResources(creds)
def _create_network_resources(self, tenant_id):
@ -334,16 +370,26 @@ class DynamicCredentialProvider(cred_provider.CredentialProvider):
self.routers_admin_client.add_router_interface(router_id,
subnet_id=subnet_id)
def get_credentials(self, credential_type):
if self._creds.get(str(credential_type)):
def get_credentials(self, credential_type, scope=None):
if not scope and self._creds.get(str(credential_type)):
credentials = self._creds[str(credential_type)]
elif scope and self._creds.get("%s_%s" % (scope, credential_type[0])):
credentials = self._creds["%s_%s" % (scope, credential_type[0])]
else:
if credential_type in ['primary', 'alt', 'admin']:
is_admin = (credential_type == 'admin')
credentials = self._create_creds(admin=is_admin)
else:
credentials = self._create_creds(roles=credential_type)
self._creds[str(credential_type)] = credentials
if scope:
credentials = self._create_creds(
roles=credential_type, scope=scope)
else:
credentials = self._create_creds(roles=credential_type)
if scope:
self._creds["%s_%s" %
(scope, credential_type[0])] = credentials
else:
self._creds[str(credential_type)] = credentials
# Maintained until tests are ported
LOG.info("Acquired dynamic creds:\n"
" credentials: %s", credentials)
@ -365,6 +411,33 @@ class DynamicCredentialProvider(cred_provider.CredentialProvider):
def get_alt_creds(self):
return self.get_credentials('alt')
def get_system_admin_creds(self):
return self.get_credentials(['admin'], scope='system')
def get_system_member_creds(self):
return self.get_credentials(['member'], scope='system')
def get_system_reader_creds(self):
return self.get_credentials(['reader'], scope='system')
def get_domain_admin_creds(self):
return self.get_credentials(['admin'], scope='domain')
def get_domain_member_creds(self):
return self.get_credentials(['member'], scope='domain')
def get_domain_reader_creds(self):
return self.get_credentials(['reader'], scope='domain')
def get_project_admin_creds(self):
return self.get_credentials(['admin'], scope='project')
def get_project_member_creds(self):
return self.get_credentials(['member'], scope='project')
def get_project_reader_creds(self):
return self.get_credentials(['reader'], scope='project')
def get_creds_by_roles(self, roles, force_new=False):
roles = list(set(roles))
# The roles list as a str will become the index as the dict key for
@ -472,6 +545,16 @@ class DynamicCredentialProvider(cred_provider.CredentialProvider):
except lib_exc.NotFound:
LOG.warning("tenant with name: %s not found for delete",
creds.tenant_name)
# if cred is domain scoped, delete ephemeral domain
# do not delete default domain
if (hasattr(creds, 'domain_id') and
creds.domain_id != creds.project_domain_id):
try:
self.creds_client.delete_domain(creds.domain_id)
except lib_exc.NotFound:
LOG.warning("domain with name: %s not found for delete",
creds.domain_name)
self._creds = {}
def is_multi_user(self):

View File

@ -294,3 +294,7 @@ class ConsistencyGroupException(TempestException):
class ConsistencyGroupSnapshotException(TempestException):
message = ("Consistency group snapshot %(cgsnapshot_id)s failed and is "
"in ERROR status")
class InvalidScopeType(TempestException):
message = "Invalid scope %(scope)s"

View File

@ -257,7 +257,7 @@ class ServiceClients(object):
# class should only be used by tests hosted in Tempest.
@removals.removed_kwarg('client_parameters')
def __init__(self, credentials, identity_uri, region=None, scope='project',
def __init__(self, credentials, identity_uri, region=None, scope=None,
disable_ssl_certificate_validation=True, ca_certs=None,
trace_requests='', client_parameters=None, proxy_url=None):
"""Service Clients provider
@ -348,6 +348,14 @@ class ServiceClients(object):
self.ca_certs = ca_certs
self.trace_requests = trace_requests
self.proxy_url = proxy_url
if self.credentials.project_id or self.credentials.project_name:
scope = 'project'
elif self.credentials.system:
scope = 'system'
elif self.credentials.domain_id or self.credentials.domain_name:
scope = 'domain'
else:
scope = 'project'
# Creates an auth provider for the credentials
self.auth_provider = auth_provider_class(
self.credentials, self.identity_uri, scope=scope,