Merge "Added Identity API tests for user ec2 credentials"
This commit is contained in:
commit
c6cc6dba8a
111
tempest/api/identity/v2/test_ec2_credentials.py
Normal file
111
tempest/api/identity/v2/test_ec2_credentials.py
Normal file
@ -0,0 +1,111 @@
|
||||
# Copyright 2015 OpenStack Foundation
|
||||
# All Rights Reserved.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
from tempest_lib import exceptions as lib_exc
|
||||
|
||||
from tempest.api.identity import base
|
||||
from tempest import test
|
||||
|
||||
|
||||
class EC2CredentialsTest(base.BaseIdentityV2Test):
|
||||
|
||||
@classmethod
|
||||
def skip_checks(cls):
|
||||
super(EC2CredentialsTest, cls).skip_checks()
|
||||
if not test.is_extension_enabled('OS-EC2', 'identity'):
|
||||
msg = "OS-EC2 identity extension not enabled."
|
||||
raise cls.skipException(msg)
|
||||
|
||||
@classmethod
|
||||
def resource_setup(cls):
|
||||
super(EC2CredentialsTest, cls).resource_setup()
|
||||
cls.creds = cls.os.credentials
|
||||
|
||||
@test.idempotent_id('b580fab9-7ae9-46e8-8138-417260cb6f9f')
|
||||
def test_create_ec2_credentials(self):
|
||||
"""Create user ec2 credentials."""
|
||||
resp = self.non_admin_client.create_user_ec2_credentials(
|
||||
self.creds.credentials.user_id,
|
||||
self.creds.credentials.tenant_id)
|
||||
access = resp['access']
|
||||
self.addCleanup(
|
||||
self.non_admin_client.delete_user_ec2_credentials,
|
||||
self.creds.credentials.user_id, access)
|
||||
self.assertNotEmpty(resp['access'])
|
||||
self.assertNotEmpty(resp['secret'])
|
||||
self.assertEqual(self.creds.credentials.user_id, resp['user_id'])
|
||||
self.assertEqual(self.creds.credentials.tenant_id, resp['tenant_id'])
|
||||
|
||||
@test.idempotent_id('9e2ea42f-0a4f-468c-a768-51859ce492e0')
|
||||
def test_list_ec2_credentials(self):
|
||||
"""Get the list of user ec2 credentials."""
|
||||
created_creds = []
|
||||
fetched_creds = []
|
||||
# create first ec2 credentials
|
||||
creds1 = self.non_admin_client.create_user_ec2_credentials(
|
||||
self.creds.credentials.user_id, self.creds.credentials.tenant_id)
|
||||
created_creds.append(creds1['access'])
|
||||
# create second ec2 credentials
|
||||
creds2 = self.non_admin_client.create_user_ec2_credentials(
|
||||
self.creds.credentials.user_id, self.creds.credentials.tenant_id)
|
||||
created_creds.append(creds2['access'])
|
||||
# add credentials to be cleaned up
|
||||
self.addCleanup(
|
||||
self.non_admin_client.delete_user_ec2_credentials,
|
||||
self.creds.credentials.user_id, creds1['access'])
|
||||
self.addCleanup(
|
||||
self.non_admin_client.delete_user_ec2_credentials,
|
||||
self.creds.credentials.user_id, creds2['access'])
|
||||
# get the list of user ec2 credentials
|
||||
resp = self.non_admin_client.list_user_ec2_credentials(
|
||||
self.creds.credentials.user_id)
|
||||
fetched_creds = [cred['access'] for cred in resp]
|
||||
# created credentials should be in a fetched list
|
||||
missing = [cred for cred in created_creds
|
||||
if cred not in fetched_creds]
|
||||
self.assertEmpty(missing,
|
||||
"Failed to find ec2_credentials %s in fetched list" %
|
||||
', '.join(cred for cred in missing))
|
||||
|
||||
@test.idempotent_id('cb284075-b613-440d-83ca-fe0b33b3c2b8')
|
||||
def test_show_ec2_credentials(self):
|
||||
"""Get the definite user ec2 credentials."""
|
||||
resp = self.non_admin_client.create_user_ec2_credentials(
|
||||
self.creds.credentials.user_id,
|
||||
self.creds.credentials.tenant_id)
|
||||
self.addCleanup(
|
||||
self.non_admin_client.delete_user_ec2_credentials,
|
||||
self.creds.credentials.user_id, resp['access'])
|
||||
|
||||
ec2_creds = self.non_admin_client.show_user_ec2_credentials(
|
||||
self.creds.credentials.user_id, resp['access']
|
||||
)
|
||||
for key in ['access', 'secret', 'user_id', 'tenant_id']:
|
||||
self.assertEqual(ec2_creds[key], resp[key])
|
||||
|
||||
@test.idempotent_id('6aba0d4c-b76b-4e46-aa42-add79bc1551d')
|
||||
def test_delete_ec2_credentials(self):
|
||||
"""Delete user ec2 credentials."""
|
||||
resp = self.non_admin_client.create_user_ec2_credentials(
|
||||
self.creds.credentials.user_id,
|
||||
self.creds.credentials.tenant_id)
|
||||
access = resp['access']
|
||||
self.non_admin_client.delete_user_ec2_credentials(
|
||||
self.creds.credentials.user_id, access)
|
||||
self.assertRaises(
|
||||
lib_exc.NotFound,
|
||||
self.non_admin_client.show_user_ec2_credentials,
|
||||
self.creds.credentials.user_id,
|
||||
access)
|
@ -188,6 +188,12 @@ IdentityFeatureGroup = [
|
||||
cfg.BoolOpt('api_v3',
|
||||
default=True,
|
||||
help='Is the v3 identity API enabled'),
|
||||
cfg.ListOpt('api_extensions',
|
||||
default=['all'],
|
||||
help="A list of enabled identity extensions with a special "
|
||||
"entry all which indicates every extension is enabled. "
|
||||
"Empty list indicates all extensions are disabled. "
|
||||
"To get the list of extensions run: 'keystone discover'")
|
||||
]
|
||||
|
||||
compute_group = cfg.OptGroup(name='compute',
|
||||
|
@ -323,7 +323,19 @@ class IdentityClient(service_client.ServiceClient):
|
||||
self.expected_success(200, resp.status)
|
||||
return service_client.ResponseBody(resp, self._parse_resp(body))
|
||||
|
||||
def delete_user_ec2_credentials(self, user_id, access):
|
||||
resp, body = self.delete('/users/%s/credentials/OS-EC2/%s' %
|
||||
(user_id, access))
|
||||
self.expected_success(204, resp.status)
|
||||
return service_client.ResponseBody(resp, body)
|
||||
|
||||
def list_user_ec2_credentials(self, user_id):
|
||||
resp, body = self.get('/users/%s/credentials/OS-EC2' % user_id)
|
||||
self.expected_success(200, resp.status)
|
||||
return service_client.ResponseBodyList(resp, self._parse_resp(body))
|
||||
|
||||
def show_user_ec2_credentials(self, user_id, access):
|
||||
resp, body = self.get('/users/%s/credentials/OS-EC2/%s' %
|
||||
(user_id, access))
|
||||
self.expected_success(200, resp.status)
|
||||
return service_client.ResponseBody(resp, self._parse_resp(body))
|
||||
|
@ -183,6 +183,7 @@ def is_extension_enabled(extension_name, service):
|
||||
'volume': CONF.volume_feature_enabled.api_extensions,
|
||||
'network': CONF.network_feature_enabled.api_extensions,
|
||||
'object': CONF.object_storage_feature_enabled.discoverable_apis,
|
||||
'identity': CONF.identity_feature_enabled.api_extensions
|
||||
}
|
||||
if len(config_dict[service]) == 0:
|
||||
return False
|
||||
|
Loading…
x
Reference in New Issue
Block a user