python-openstackclient/openstackclient/tests/functional/identity/v3/common.py

402 lines
16 KiB
Python

# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import os
import fixtures
from tempest.lib.common.utils import data_utils
from openstackclient.tests.functional import base
BASIC_LIST_HEADERS = ['ID', 'Name']
class IdentityTests(base.TestCase):
"""Functional tests for Identity commands. """
DOMAIN_FIELDS = ['description', 'enabled', 'id', 'name']
GROUP_FIELDS = ['description', 'domain_id', 'id', 'name']
TOKEN_FIELDS = ['expires', 'id', 'project_id', 'user_id']
USER_FIELDS = ['email', 'enabled', 'id', 'name', 'name',
'domain_id', 'default_project_id', 'description',
'password_expires_at']
PROJECT_FIELDS = ['description', 'id', 'domain_id', 'is_domain',
'enabled', 'name', 'parent_id']
ROLE_FIELDS = ['id', 'name', 'domain_id']
SERVICE_FIELDS = ['id', 'enabled', 'name', 'type', 'description']
REGION_FIELDS = ['description', 'enabled', 'parent_region', 'region']
ENDPOINT_FIELDS = ['id', 'region', 'region_id', 'service_id',
'service_name', 'service_type', 'enabled',
'interface', 'url']
REGION_LIST_HEADERS = ['Region', 'Parent Region', 'Description']
ENDPOINT_LIST_HEADERS = ['ID', 'Region', 'Service Name', 'Service Type',
'Enabled', 'Interface', 'URL']
ENDPOINT_LIST_PROJECT_HEADERS = ['ID', 'Name']
IDENTITY_PROVIDER_FIELDS = ['description', 'enabled', 'id', 'remote_ids',
'domain_id']
IDENTITY_PROVIDER_LIST_HEADERS = ['ID', 'Enabled', 'Description']
SERVICE_PROVIDER_FIELDS = ['auth_url', 'description', 'enabled',
'id', 'relay_state_prefix', 'sp_url']
SERVICE_PROVIDER_LIST_HEADERS = ['ID', 'Enabled', 'Description',
'Auth URL']
IMPLIED_ROLE_LIST_HEADERS = ['Prior Role ID', 'Prior Role Name',
'Implied Role ID', 'Implied Role Name']
REGISTERED_LIMIT_FIELDS = ['id', 'service_id', 'resource_name',
'default_limit', 'description', 'region_id']
REGISTERED_LIMIT_LIST_HEADERS = ['ID', 'Service ID', 'Resource Name',
'Default Limit', 'Description',
'Region ID']
LIMIT_FIELDS = ['id', 'project_id', 'service_id', 'resource_name',
'resource_limit', 'description', 'region_id']
LIMIT_LIST_HEADERS = ['ID', 'Project ID', 'Service ID', 'Resource Name',
'Resource Limit', 'Description', 'Region ID']
@classmethod
def setUpClass(cls):
super(IdentityTests, cls).setUpClass()
# create dummy domain
cls.domain_name = data_utils.rand_name('TestDomain')
cls.domain_description = data_utils.rand_name('description')
cls.openstack(
'--os-identity-api-version 3 '
'domain create '
'--description %(description)s '
'--enable '
'%(name)s' % {'description': cls.domain_description,
'name': cls.domain_name})
# create dummy project
cls.project_name = data_utils.rand_name('TestProject')
cls.project_description = data_utils.rand_name('description')
cls.openstack(
'--os-identity-api-version 3 '
'project create '
'--domain %(domain)s '
'--description %(description)s '
'--enable '
'%(name)s' % {'domain': cls.domain_name,
'description': cls.project_description,
'name': cls.project_name})
@classmethod
def tearDownClass(cls):
try:
# delete dummy project
cls.openstack('--os-identity-api-version 3 '
'project delete %s' % cls.project_name)
# disable and delete dummy domain
cls.openstack('--os-identity-api-version 3 '
'domain set --disable %s' % cls.domain_name)
cls.openstack('--os-identity-api-version 3 '
'domain delete %s' % cls.domain_name)
finally:
super(IdentityTests, cls).tearDownClass()
def setUp(self):
super(IdentityTests, self).setUp()
# prepare v3 env
ver_fixture = fixtures.EnvironmentVariable(
'OS_IDENTITY_API_VERSION', '3')
self.useFixture(ver_fixture)
auth_url = os.environ.get('OS_AUTH_URL')
if auth_url:
auth_url_fixture = fixtures.EnvironmentVariable(
'OS_AUTH_URL', auth_url.replace('v2.0', 'v3')
)
self.useFixture(auth_url_fixture)
def _create_dummy_user(self, add_clean_up=True):
username = data_utils.rand_name('TestUser')
password = data_utils.rand_name('password')
email = data_utils.rand_name() + '@example.com'
description = data_utils.rand_name('description')
raw_output = self.openstack(
'user create '
'--domain %(domain)s '
'--project %(project)s '
'--project-domain %(project_domain)s '
'--password %(password)s '
'--email %(email)s '
'--description %(description)s '
'--enable '
'%(name)s' % {'domain': self.domain_name,
'project': self.project_name,
'project_domain': self.domain_name,
'email': email,
'password': password,
'description': description,
'name': username})
if add_clean_up:
self.addCleanup(
self.openstack,
'user delete %s' % self.parse_show_as_object(raw_output)['id'])
items = self.parse_show(raw_output)
self.assert_show_fields(items, self.USER_FIELDS)
return username
def _create_dummy_role(self, add_clean_up=True):
role_name = data_utils.rand_name('TestRole')
raw_output = self.openstack('role create %s' % role_name)
role = self.parse_show_as_object(raw_output)
if add_clean_up:
self.addCleanup(
self.openstack,
'role delete %s' % role['id'])
items = self.parse_show(raw_output)
self.assert_show_fields(items, self.ROLE_FIELDS)
self.assertEqual(role_name, role['name'])
return role_name
def _create_dummy_implied_role(self, add_clean_up=True):
role_name = self._create_dummy_role(add_clean_up)
implied_role_name = self._create_dummy_role(add_clean_up)
self.openstack(
'implied role create '
'--implied-role %(implied_role)s '
'%(role)s' % {'implied_role': implied_role_name,
'role': role_name})
return implied_role_name, role_name
def _create_dummy_group(self, add_clean_up=True):
group_name = data_utils.rand_name('TestGroup')
description = data_utils.rand_name('description')
raw_output = self.openstack(
'group create '
'--domain %(domain)s '
'--description %(description)s '
'%(name)s' % {'domain': self.domain_name,
'description': description,
'name': group_name})
if add_clean_up:
self.addCleanup(
self.openstack,
'group delete '
'--domain %(domain)s '
'%(name)s' % {'domain': self.domain_name,
'name': group_name})
items = self.parse_show(raw_output)
self.assert_show_fields(items, self.GROUP_FIELDS)
return group_name
def _create_dummy_domain(self, add_clean_up=True):
domain_name = data_utils.rand_name('TestDomain')
domain_description = data_utils.rand_name('description')
self.openstack(
'domain create '
'--description %(description)s '
'--enable %(name)s' % {'description': domain_description,
'name': domain_name})
if add_clean_up:
self.addCleanup(
self.openstack,
'domain delete %s' % domain_name
)
self.addCleanup(
self.openstack,
'domain set --disable %s' % domain_name
)
return domain_name
def _create_dummy_project(self, add_clean_up=True):
project_name = data_utils.rand_name('TestProject')
project_description = data_utils.rand_name('description')
self.openstack(
'project create '
'--domain %(domain)s '
'--description %(description)s '
'--enable %(name)s' % {'domain': self.domain_name,
'description': project_description,
'name': project_name})
if add_clean_up:
self.addCleanup(
self.openstack,
'project delete '
'--domain %(domain)s '
'%(name)s' % {'domain': self.domain_name,
'name': project_name})
return project_name
def _create_dummy_region(self, parent_region=None, add_clean_up=True):
region_id = data_utils.rand_name('TestRegion')
description = data_utils.rand_name('description')
parent_region_arg = ''
if parent_region is not None:
parent_region_arg = '--parent-region %s' % parent_region
raw_output = self.openstack(
'region create '
'%(parent_region_arg)s '
'--description %(description)s '
'%(id)s' % {'parent_region_arg': parent_region_arg,
'description': description,
'id': region_id})
if add_clean_up:
self.addCleanup(self.openstack,
'region delete %s' % region_id)
items = self.parse_show(raw_output)
self.assert_show_fields(items, self.REGION_FIELDS)
return region_id
def _create_dummy_service(self, add_clean_up=True):
service_name = data_utils.rand_name('TestService')
description = data_utils.rand_name('description')
type_name = data_utils.rand_name('TestType')
raw_output = self.openstack(
'service create '
'--name %(name)s '
'--description %(description)s '
'--enable '
'%(type)s' % {'name': service_name,
'description': description,
'type': type_name})
if add_clean_up:
service = self.parse_show_as_object(raw_output)
self.addCleanup(self.openstack,
'service delete %s' % service['id'])
items = self.parse_show(raw_output)
self.assert_show_fields(items, self.SERVICE_FIELDS)
return service_name
def _create_dummy_endpoint(self, interface='public', add_clean_up=True):
region_id = self._create_dummy_region()
service_name = self._create_dummy_service()
endpoint_url = data_utils.rand_url()
raw_output = self.openstack(
'endpoint create '
'--region %(region)s '
'--enable '
'%(service)s '
'%(interface)s '
'%(url)s' % {'region': region_id,
'service': service_name,
'interface': interface,
'url': endpoint_url})
endpoint = self.parse_show_as_object(raw_output)
if add_clean_up:
self.addCleanup(
self.openstack,
'endpoint delete %s' % endpoint['id'])
items = self.parse_show(raw_output)
self.assert_show_fields(items, self.ENDPOINT_FIELDS)
return endpoint['id']
def _create_dummy_idp(self, add_clean_up=True):
identity_provider = data_utils.rand_name('IdentityProvider')
description = data_utils.rand_name('description')
raw_output = self.openstack(
'identity provider create '
' %(name)s '
'--description %(description)s '
'--enable ' % {'name': identity_provider,
'description': description})
if add_clean_up:
self.addCleanup(
self.openstack,
'identity provider delete %s' % identity_provider)
items = self.parse_show(raw_output)
self.assert_show_fields(items, self.IDENTITY_PROVIDER_FIELDS)
return identity_provider
def _create_dummy_sp(self, add_clean_up=True):
service_provider = data_utils.rand_name('ServiceProvider')
description = data_utils.rand_name('description')
raw_output = self.openstack(
'service provider create '
' %(name)s '
'--description %(description)s '
'--auth-url https://sp.example.com:35357 '
'--service-provider-url https://sp.example.com:5000 '
'--enable ' % {'name': service_provider,
'description': description})
if add_clean_up:
self.addCleanup(
self.openstack,
'service provider delete %s' % service_provider)
items = self.parse_show(raw_output)
self.assert_show_fields(items, self.SERVICE_PROVIDER_FIELDS)
return service_provider
def _create_dummy_registered_limit(self, add_clean_up=True):
service_name = self._create_dummy_service()
resource_name = data_utils.rand_name('resource_name')
params = {
'service_name': service_name,
'default_limit': 10,
'resource_name': resource_name
}
raw_output = self.openstack(
'registered limit create'
' --service %(service_name)s'
' --default-limit %(default_limit)s'
' %(resource_name)s' % params
)
items = self.parse_show(raw_output)
registered_limit_id = self._extract_value_from_items('id', items)
if add_clean_up:
self.addCleanup(
self.openstack,
'registered limit delete %s' % registered_limit_id
)
self.assert_show_fields(items, self.REGISTERED_LIMIT_FIELDS)
return registered_limit_id
def _extract_value_from_items(self, key, items):
for d in items:
for k, v in d.iteritems():
if k == key:
return v
def _create_dummy_limit(self, add_clean_up=True):
registered_limit_id = self._create_dummy_registered_limit()
raw_output = self.openstack(
'registered limit show %s' % registered_limit_id
)
items = self.parse_show(raw_output)
resource_name = self._extract_value_from_items('resource_name', items)
service_id = self._extract_value_from_items('service_id', items)
resource_limit = 15
project_name = self._create_dummy_project()
raw_output = self.openstack('project show %s' % project_name)
items = self.parse_show(raw_output)
project_id = self._extract_value_from_items('id', items)
params = {
'project_id': project_id,
'service_id': service_id,
'resource_name': resource_name,
'resource_limit': resource_limit
}
raw_output = self.openstack(
'limit create'
' --project %(project_id)s'
' --service %(service_id)s'
' --resource-limit %(resource_limit)s'
' %(resource_name)s' % params
)
items = self.parse_show(raw_output)
limit_id = self._extract_value_from_items('id', items)
if add_clean_up:
self.addCleanup(self.openstack, 'limit delete %s' % limit_id)
self.assert_show_fields(items, self.LIMIT_FIELDS)
return limit_id