Adds an identity admin client and API tests for keystone roles.

Added a config option for the [identity] section
     * catalog type - to specify endpoints for the Identity service

    Fixes bug 902389

Change-Id: I429d8bbfe3e6de8432a1a7b79a8676c63925f78f
This commit is contained in:
chris fattarsi 2012-04-30 14:11:27 -07:00
parent ad6feca97e
commit 8ed39ac8b4
9 changed files with 148 additions and 6 deletions

View File

@ -3,6 +3,10 @@
# test clients use when authenticating with different user/tenant
# combinations
# The type of endpoint for a Identity service. Unless you have a
# custom Keystone service catalog implementation, you probably want to leave
# this value as "identity"
catalog_type = identity
# Set to True if your test environment's Keystone authentication service should
# be accessed over HTTPS
use_ssl = False

View File

@ -132,12 +132,15 @@ class RestClient(object):
if mgmt_url == None:
raise exceptions.EndpointNotFound(service)
#TODO (dwalleck): This is a horrible stopgap.
#Need to join strings more cleanly
temp = mgmt_url.rsplit('/')
service_url = temp[0] + '//' + temp[2] + '/' + temp[3] + '/'
management_url = service_url + tenant_id
return token, management_url
if mgmt_url.endswith(tenant_id):
return token, mgmt_url
else:
#TODO (dwalleck): This is a horrible stopgap.
#Need to join strings more cleanly
temp = mgmt_url.rsplit('/')
service_url = temp[0] + '//' + temp[2] + '/' + temp[3] + '/'
management_url = service_url + tenant_id
return token, management_url
elif resp.status == 401:
raise exceptions.AuthenticationFailure(user=user,
password=password)

View File

@ -44,6 +44,11 @@ class IdentityConfig(BaseConfig):
SECTION_NAME = "identity"
@property
def catalog_type(self):
"""Catalog type of the Identity service."""
return self.get("catalog_type", 'identity')
@property
def host(self):
"""Host IP for making Identity API requests."""

View File

@ -30,6 +30,7 @@ import SecurityGroupsClient
from tempest.services.nova.json.floating_ips_client import FloatingIPsClient
from tempest.services.nova.json.keypairs_client import KeyPairsClient
from tempest.services.nova.json.volumes_client import VolumesClient
from tempest.services.identity.json.admin_client import AdminClient
LOG = logging.getLogger(__name__)
@ -79,6 +80,7 @@ class Manager(object):
self.security_groups_client = SecurityGroupsClient(*client_args)
self.floating_ips_client = FloatingIPsClient(*client_args)
self.volumes_client = VolumesClient(*client_args)
self.admin_client = AdminClient(*client_args)
class AltManager(Manager):

View File

View File

@ -0,0 +1,44 @@
from tempest.common.rest_client import RestClient
import json
class AdminClient(RestClient):
def __init__(self, config, username, password, auth_url, tenant_name=None):
super(AdminClient, self).__init__(config, username, password,
auth_url, tenant_name)
self.service = self.config.identity.catalog_type
self.endpoint_url = 'adminURL'
def has_admin_extensions(self):
"""
Returns True if the KSADM Admin Extensions are supported
False otherwise
"""
if hasattr(self, '_has_admin_extensions'):
return self._has_admin_extensions
resp, body = self.list_roles()
self._has_admin_extensions = ('status' in resp and resp.status != 503)
return self._has_admin_extensions
def create_role(self, name):
"""Create a role"""
post_body = {
'name': name,
}
post_body = json.dumps({'role': post_body})
resp, body = self.post('OS-KSADM/roles', post_body,
self.headers)
body = json.loads(body)
return resp, body['role']
def delete_role(self, role_id):
"""Delete a role"""
resp, body = self.delete('OS-KSADM/roles/%s' % role_id)
return resp, body
def list_roles(self):
"""Returns roles"""
resp, body = self.get('OS-KSADM/roles')
body = json.loads(body)
return resp, body['roles']

View File

View File

@ -0,0 +1,84 @@
import unittest2 as unittest
import nose
from tempest import openstack
from tempest import exceptions
from tempest.common.utils.data_utils import rand_name
from tempest.tests import utils
class RolesTest(unittest.TestCase):
@classmethod
def setUpClass(cls):
cls.os = openstack.AdminManager()
cls.client = cls.os.admin_client
cls.config = cls.os.config
if not cls.client.has_admin_extensions():
raise nose.SkipTest("Admin extensions disabled")
cls.roles = []
for _ in xrange(5):
resp, body = cls.client.create_role(rand_name('role-'))
cls.roles.append(body['id'])
@classmethod
def tearDownClass(cls):
for role in cls.roles:
cls.client.delete_role(role)
def test_list_roles(self):
"""Return a list of all roles"""
resp, body = self.client.list_roles()
found = [role for role in body if role['id'] in self.roles]
self.assertTrue(any(found))
self.assertEqual(len(found), len(self.roles))
def test_role_create_delete(self):
"""Role should be created, verified, and deleted"""
role_name = rand_name('role-test-')
resp, body = self.client.create_role(role_name)
self.assertTrue('status' in resp)
self.assertTrue(resp['status'].startswith('2'))
self.assertEqual(role_name, body['name'])
resp, body = self.client.list_roles()
found = [role for role in body if role['name'] == role_name]
self.assertTrue(any(found))
resp, body = self.client.delete_role(found[0]['id'])
self.assertTrue('status' in resp)
self.assertTrue(resp['status'].startswith('2'))
resp, body = self.client.list_roles()
found = [role for role in body if role['name'] == role_name]
self.assertFalse(any(found))
def test_role_create_blank_name(self):
"""Should not be able to create a role with a blank name"""
try:
resp, body = self.client.create_role('')
except exceptions.Duplicate:
self.fail('A role with a blank name already exists.')
self.assertTrue('status' in resp)
self.assertFalse(resp['status'].startswith('2'), 'Create role with '
'empty name should fail')
def test_role_create_duplicate(self):
"""Role names should be unique"""
role_name = rand_name('role-dup-')
resp, body = self.client.create_role(role_name)
role1_id = body.get('id')
self.assertTrue('status' in resp)
self.assertTrue(resp['status'].startswith('2'))
try:
resp, body = self.client.create_role(role_name)
# this should raise an exception
self.fail('Should not be able to create a duplicate role name.'
' %s' % role_name)
except exceptions.Duplicate:
pass
self.client.delete_role(role1_id)