tempest/tempest/api/identity/base.py

268 lines
9.8 KiB
Python

# Copyright 2012 OpenStack Foundation
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from oslo_log import log as logging
from tempest_lib.common.utils import data_utils
from tempest_lib import exceptions as lib_exc
from tempest import clients
from tempest.common import cred_provider
from tempest import config
import tempest.test
CONF = config.CONF
LOG = logging.getLogger(__name__)
class BaseIdentityAdminTest(tempest.test.BaseTestCase):
@classmethod
def setup_credentials(cls):
super(BaseIdentityAdminTest, cls).setup_credentials()
cls.os_adm = clients.AdminManager()
cls.os = clients.Manager()
@classmethod
def disable_user(cls, user_name):
user = cls.get_user_by_name(user_name)
cls.client.enable_disable_user(user['id'], False)
@classmethod
def disable_tenant(cls, tenant_name):
tenant = cls.get_tenant_by_name(tenant_name)
cls.client.update_tenant(tenant['id'], enabled=False)
@classmethod
def get_user_by_name(cls, name):
users = cls.client.get_users()
user = [u for u in users if u['name'] == name]
if len(user) > 0:
return user[0]
@classmethod
def get_tenant_by_name(cls, name):
try:
tenants = cls.client.list_tenants()
except AttributeError:
tenants = cls.client.list_projects()
tenant = [t for t in tenants if t['name'] == name]
if len(tenant) > 0:
return tenant[0]
@classmethod
def get_role_by_name(cls, name):
roles = cls.client.list_roles()
role = [r for r in roles if r['name'] == name]
if len(role) > 0:
return role[0]
class BaseIdentityV2AdminTest(BaseIdentityAdminTest):
@classmethod
def skip_checks(cls):
super(BaseIdentityV2AdminTest, cls).skip_checks()
if not CONF.identity_feature_enabled.api_v2:
raise cls.skipException("Identity api v2 is not enabled")
@classmethod
def setup_clients(cls):
super(BaseIdentityV2AdminTest, cls).setup_clients()
cls.client = cls.os_adm.identity_client
cls.token_client = cls.os_adm.token_client
if not cls.client.has_admin_extensions():
raise cls.skipException("Admin extensions disabled")
cls.non_admin_client = cls.os.identity_client
@classmethod
def resource_setup(cls):
super(BaseIdentityV2AdminTest, cls).resource_setup()
cls.data = DataGenerator(cls.client)
@classmethod
def resource_cleanup(cls):
cls.data.teardown_all()
super(BaseIdentityV2AdminTest, cls).resource_cleanup()
class BaseIdentityV3AdminTest(BaseIdentityAdminTest):
@classmethod
def skip_checks(cls):
super(BaseIdentityV3AdminTest, cls).skip_checks()
if not CONF.identity_feature_enabled.api_v3:
raise cls.skipException("Identity api v3 is not enabled")
@classmethod
def setup_clients(cls):
super(BaseIdentityV3AdminTest, cls).setup_clients()
cls.client = cls.os_adm.identity_v3_client
cls.token = cls.os_adm.token_v3_client
cls.endpoints_client = cls.os_adm.endpoints_client
cls.region_client = cls.os_adm.region_client
cls.data = DataGenerator(cls.client)
cls.non_admin_client = cls.os.identity_v3_client
cls.service_client = cls.os_adm.service_client
cls.policy_client = cls.os_adm.policy_client
cls.creds_client = cls.os_adm.credentials_client
cls.non_admin_client = cls.os.identity_v3_client
@classmethod
def resource_cleanup(cls):
cls.data.teardown_all()
super(BaseIdentityV3AdminTest, cls).resource_cleanup()
@classmethod
def get_user_by_name(cls, name):
users = cls.client.get_users()
user = [u for u in users if u['name'] == name]
if len(user) > 0:
return user[0]
@classmethod
def get_tenant_by_name(cls, name):
tenants = cls.client.list_projects()
tenant = [t for t in tenants if t['name'] == name]
if len(tenant) > 0:
return tenant[0]
@classmethod
def get_role_by_name(cls, name):
roles = cls.client.list_roles()
role = [r for r in roles if r['name'] == name]
if len(role) > 0:
return role[0]
class DataGenerator(object):
def __init__(self, client):
self.client = client
self.users = []
self.tenants = []
self.roles = []
self.role_name = None
self.v3_users = []
self.projects = []
self.v3_roles = []
self.domains = []
@property
def test_credentials(self):
return cred_provider.get_credentials(username=self.test_user,
user_id=self.user['id'],
password=self.test_password,
tenant_name=self.test_tenant,
tenant_id=self.tenant['id'])
def setup_test_user(self):
"""Set up a test user."""
self.setup_test_tenant()
self.test_user = data_utils.rand_name('test_user_')
self.test_password = data_utils.rand_name('pass_')
self.test_email = self.test_user + '@testmail.tm'
self.user = self.client.create_user(self.test_user,
self.test_password,
self.tenant['id'],
self.test_email)
self.users.append(self.user)
def setup_test_tenant(self):
"""Set up a test tenant."""
self.test_tenant = data_utils.rand_name('test_tenant_')
self.test_description = data_utils.rand_name('desc_')
self.tenant = self.client.create_tenant(
name=self.test_tenant,
description=self.test_description)
self.tenants.append(self.tenant)
def setup_test_role(self):
"""Set up a test role."""
self.test_role = data_utils.rand_name('role')
self.role = self.client.create_role(self.test_role)
self.roles.append(self.role)
def setup_test_v3_user(self):
"""Set up a test v3 user."""
self.setup_test_project()
self.test_user = data_utils.rand_name('test_user_')
self.test_password = data_utils.rand_name('pass_')
self.test_email = self.test_user + '@testmail.tm'
self.v3_user = self.client.create_user(
self.test_user,
password=self.test_password,
project_id=self.project['id'],
email=self.test_email)
self.v3_users.append(self.v3_user)
def setup_test_project(self):
"""Set up a test project."""
self.test_project = data_utils.rand_name('test_project_')
self.test_description = data_utils.rand_name('desc_')
self.project = self.client.create_project(
name=self.test_project,
description=self.test_description)
self.projects.append(self.project)
def setup_test_v3_role(self):
"""Set up a test v3 role."""
self.test_role = data_utils.rand_name('role')
self.v3_role = self.client.create_role(self.test_role)
self.v3_roles.append(self.v3_role)
def setup_test_domain(self):
"""Set up a test domain."""
self.test_domain = data_utils.rand_name('test_domain')
self.test_description = data_utils.rand_name('desc')
self.domain = self.client.create_domain(
name=self.test_domain,
description=self.test_description)
self.domains.append(self.domain)
@staticmethod
def _try_wrapper(func, item, **kwargs):
try:
if kwargs:
func(item['id'], **kwargs)
else:
func(item['id'])
except lib_exc.NotFound:
pass
except Exception:
LOG.exception("Unexpected exception occurred in %s deletion."
" But ignored here." % item['id'])
def teardown_all(self):
# NOTE(masayukig): v3 client doesn't have v2 method.
# (e.g. delete_tenant) So we need to check resources existence
# before using client methods.
for user in self.users:
self._try_wrapper(self.client.delete_user, user)
for tenant in self.tenants:
self._try_wrapper(self.client.delete_tenant, tenant)
for role in self.roles:
self._try_wrapper(self.client.delete_role, role)
for v3_user in self.v3_users:
self._try_wrapper(self.client.delete_user, v3_user)
for v3_project in self.projects:
self._try_wrapper(self.client.delete_project, v3_project)
for v3_role in self.v3_roles:
self._try_wrapper(self.client.delete_role, v3_role)
for domain in self.domains:
self._try_wrapper(self.client.update_domain, domain,
enabled=False)
self._try_wrapper(self.client.delete_domain, domain)