Stephen Finucane f98006ca9d pre-commit: Migrate pyupgrade to ruff
Change-Id: Ic50d2a5e0bc9dcdfe29f382607135cab510cd396
Signed-off-by: Stephen Finucane <stephenfin@redhat.com>
2024-09-12 18:08:23 +01:00

222 lines
7.9 KiB
Python

# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import os
import unittest
import fixtures
from tempest.lib.common.utils import data_utils
from tempest.lib import exceptions as tempest_exceptions
from openstackclient.tests.functional import base
BASIC_LIST_HEADERS = ['ID', 'Name']
class IdentityTests(base.TestCase):
"""Functional tests for Identity commands."""
USER_FIELDS = ['email', 'enabled', 'id', 'name', 'project_id', 'username']
PROJECT_FIELDS = ['enabled', 'id', 'name', 'description']
TOKEN_FIELDS = ['expires', 'id', 'project_id', 'user_id']
ROLE_FIELDS = ['id', 'name', 'domain_id']
SERVICE_FIELDS = ['id', 'enabled', 'name', 'type', 'description']
ENDPOINT_FIELDS = [
'id',
'region',
'service_id',
'service_name',
'service_type',
'publicurl',
'adminurl',
'internalurl',
]
EC2_CREDENTIALS_FIELDS = [
'access',
'project_id',
'secret',
'trust_id',
'user_id',
]
EC2_CREDENTIALS_LIST_HEADERS = [
'Access',
'Secret',
'Project ID',
'User ID',
]
CATALOG_LIST_HEADERS = ['Name', 'Type', 'Endpoints']
ENDPOINT_LIST_HEADERS = ['ID', 'Region', 'Service Name', 'Service Type']
@classmethod
def setUpClass(cls):
super().setUpClass()
# create dummy project
cls.project_name = data_utils.rand_name('TestProject')
cls.project_description = data_utils.rand_name('description')
try:
cls.openstack(
'--os-identity-api-version 2 '
'project create '
f'--description {cls.project_description} '
'--enable '
f'{cls.project_name}'
)
except tempest_exceptions.CommandFailed:
# Good chance this is due to Identity v2 admin not being enabled
# TODO(dtroyer): Actually determine if Identity v2 admin is
# enabled in the target cloud. Tuens out OSC
# doesn't make this easy as it should (yet).
raise unittest.case.SkipTest('No Identity v2 admin endpoint?')
@classmethod
def tearDownClass(cls):
try:
cls.openstack(
'--os-identity-api-version 2 '
f'project delete {cls.project_name}'
)
finally:
super().tearDownClass()
def setUp(self):
super().setUp()
# prepare v2 env
ver_fixture = fixtures.EnvironmentVariable(
'OS_IDENTITY_API_VERSION', '2.0'
)
self.useFixture(ver_fixture)
auth_url = os.environ.get('OS_AUTH_URL')
if auth_url:
auth_url_fixture = fixtures.EnvironmentVariable(
'OS_AUTH_URL', auth_url.replace('v3', 'v2.0')
)
self.useFixture(auth_url_fixture)
def _create_dummy_project(self, add_clean_up=True):
project_name = data_utils.rand_name('TestProject')
project_description = data_utils.rand_name('description')
raw_output = self.openstack(
'project create '
f'--description {project_description} '
f'--enable {project_name}'
)
project = self.parse_show_as_object(raw_output)
if add_clean_up:
self.addCleanup(
self.openstack, 'project delete {}'.format(project['id'])
)
items = self.parse_show(raw_output)
self.assert_show_fields(items, self.PROJECT_FIELDS)
return project_name
def _create_dummy_user(self, add_clean_up=True):
username = data_utils.rand_name('TestUser')
password = data_utils.rand_name('password')
email = data_utils.rand_name() + '@example.com'
raw_output = self.openstack(
'user create '
f'--project {self.project_name} '
f'--password {password} '
f'--email {email} '
'--enable '
f'{username}'
)
if add_clean_up:
self.addCleanup(
self.openstack,
'user delete {}'.format(
self.parse_show_as_object(raw_output)['id']
),
)
items = self.parse_show(raw_output)
self.assert_show_fields(items, self.USER_FIELDS)
return username
def _create_dummy_role(self, add_clean_up=True):
role_name = data_utils.rand_name('TestRole')
raw_output = self.openstack(f'role create {role_name}')
role = self.parse_show_as_object(raw_output)
if add_clean_up:
self.addCleanup(
self.openstack, 'role delete {}'.format(role['id'])
)
items = self.parse_show(raw_output)
self.assert_show_fields(items, self.ROLE_FIELDS)
self.assertEqual(role_name, role['name'])
return role_name
def _create_dummy_ec2_credentials(self, add_clean_up=True):
raw_output = self.openstack('ec2 credentials create')
ec2_credentials = self.parse_show_as_object(raw_output)
access_key = ec2_credentials['access']
if add_clean_up:
self.addCleanup(
self.openstack, f'ec2 credentials delete {access_key}'
)
items = self.parse_show(raw_output)
self.assert_show_fields(items, self.EC2_CREDENTIALS_FIELDS)
return access_key
def _create_dummy_token(self, add_clean_up=True):
raw_output = self.openstack('token issue')
token = self.parse_show_as_object(raw_output)
if add_clean_up:
self.addCleanup(
self.openstack, 'token revoke {}'.format(token['id'])
)
items = self.parse_show(raw_output)
self.assert_show_fields(items, self.TOKEN_FIELDS)
return token['id']
def _create_dummy_service(self, add_clean_up=True):
service_name = data_utils.rand_name('TestService')
description = data_utils.rand_name('description')
type_name = data_utils.rand_name('TestType')
raw_output = self.openstack(
'service create '
f'--name {service_name} '
f'--description {description} '
f'{type_name}'
)
if add_clean_up:
service = self.parse_show_as_object(raw_output)
self.addCleanup(
self.openstack, 'service delete {}'.format(service['id'])
)
items = self.parse_show(raw_output)
self.assert_show_fields(items, self.SERVICE_FIELDS)
return service_name
def _create_dummy_endpoint(self, add_clean_up=True):
region_id = data_utils.rand_name('TestRegion')
service_name = self._create_dummy_service()
public_url = data_utils.rand_url()
admin_url = data_utils.rand_url()
internal_url = data_utils.rand_url()
raw_output = self.openstack(
'endpoint create '
f'--publicurl {public_url} '
f'--adminurl {admin_url} '
f'--internalurl {internal_url} '
f'--region {region_id} '
f'{service_name}'
)
endpoint = self.parse_show_as_object(raw_output)
if add_clean_up:
self.addCleanup(
self.openstack, 'endpoint delete {}'.format(endpoint['id'])
)
items = self.parse_show(raw_output)
self.assert_show_fields(items, self.ENDPOINT_FIELDS)
return endpoint['id']