Drop OfficialClientManager and references to it

OfficialClientManager is unused now that scenario tests use
clients.Manager instead. Dropping the class, the hacking
rule that forbade its use in API tests and related unit tests.

Change-Id: Ib359b9ab0a903436d9d9c265e8c4d82bdd80a496
This commit is contained in:
Andrea Frittoli 2014-09-25 11:50:05 +01:00
parent ae9aca0cc0
commit 486ede779b
4 changed files with 1 additions and 322 deletions

View File

@ -13,9 +13,6 @@
# License for the specific language governing permissions and limitations
# under the License.
import keystoneclient.exceptions
import keystoneclient.v2_0.client
from tempest import auth
from tempest.common import rest_client
from tempest import config
@ -488,290 +485,3 @@ class ComputeAdminManager(Manager):
credentials=auth.get_default_credentials('compute_admin'),
interface=interface,
service=service)
class OfficialClientManager(manager.Manager):
"""
Manager that provides access to the official python clients for
calling various OpenStack APIs.
"""
NOVACLIENT_VERSION = '2'
CINDERCLIENT_VERSION = '1'
HEATCLIENT_VERSION = '1'
IRONICCLIENT_VERSION = '1'
SAHARACLIENT_VERSION = '1.1'
CEILOMETERCLIENT_VERSION = '2'
def __init__(self, credentials):
# FIXME(andreaf) Auth provider for client_type 'official' is
# not implemented yet, setting to 'tempest' for now.
self.client_type = 'tempest'
self.interface = None
# super cares for credentials validation
super(OfficialClientManager, self).__init__(credentials=credentials)
self.baremetal_client = self._get_baremetal_client()
self.compute_client = self._get_compute_client(credentials)
self.identity_client = self._get_identity_client(credentials)
self.image_client = self._get_image_client()
self.network_client = self._get_network_client()
self.volume_client = self._get_volume_client(credentials)
self.object_storage_client = self._get_object_storage_client(
credentials)
self.orchestration_client = self._get_orchestration_client(
credentials)
self.data_processing_client = self._get_data_processing_client(
credentials)
self.ceilometer_client = self._get_ceilometer_client(
credentials)
def _get_roles(self):
admin_credentials = auth.get_default_credentials('identity_admin')
keystone_admin = self._get_identity_client(admin_credentials)
username = self.credentials.username
tenant_name = self.credentials.tenant_name
user_id = keystone_admin.users.find(name=username).id
tenant_id = keystone_admin.tenants.find(name=tenant_name).id
roles = keystone_admin.roles.roles_for_user(
user=user_id, tenant=tenant_id)
return [r.name for r in roles]
def _get_compute_client(self, credentials):
# Novaclient will not execute operations for anyone but the
# identified user, so a new client needs to be created for
# each user that operations need to be performed for.
if not CONF.service_available.nova:
return None
import novaclient.client
auth_url = CONF.identity.uri
dscv = CONF.identity.disable_ssl_certificate_validation
region = CONF.identity.region
client_args = (credentials.username, credentials.password,
credentials.tenant_name, auth_url)
# Create our default Nova client to use in testing
service_type = CONF.compute.catalog_type
endpoint_type = CONF.compute.endpoint_type
return novaclient.client.Client(self.NOVACLIENT_VERSION,
*client_args,
service_type=service_type,
endpoint_type=endpoint_type,
region_name=region,
no_cache=True,
insecure=dscv,
http_log_debug=True)
def _get_image_client(self):
if not CONF.service_available.glance:
return None
import glanceclient
token = self.identity_client.auth_token
region = CONF.identity.region
endpoint_type = CONF.image.endpoint_type
endpoint = self.identity_client.service_catalog.url_for(
attr='region', filter_value=region,
service_type=CONF.image.catalog_type, endpoint_type=endpoint_type)
dscv = CONF.identity.disable_ssl_certificate_validation
return glanceclient.Client('1', endpoint=endpoint, token=token,
insecure=dscv)
def _get_volume_client(self, credentials):
if not CONF.service_available.cinder:
return None
import cinderclient.client
auth_url = CONF.identity.uri
region = CONF.identity.region
endpoint_type = CONF.volume.endpoint_type
dscv = CONF.identity.disable_ssl_certificate_validation
return cinderclient.client.Client(self.CINDERCLIENT_VERSION,
credentials.username,
credentials.password,
credentials.tenant_name,
auth_url,
region_name=region,
endpoint_type=endpoint_type,
insecure=dscv,
http_log_debug=True)
def _get_object_storage_client(self, credentials):
if not CONF.service_available.swift:
return None
import swiftclient
auth_url = CONF.identity.uri
# add current tenant to swift operator role group.
admin_credentials = auth.get_default_credentials('identity_admin')
keystone_admin = self._get_identity_client(admin_credentials)
# enable test user to operate swift by adding operator role to him.
roles = keystone_admin.roles.list()
operator_role = CONF.object_storage.operator_role
member_role = [role for role in roles if role.name == operator_role][0]
# NOTE(maurosr): This is surrounded in the try-except block cause
# neutron tests doesn't have tenant isolation.
try:
keystone_admin.roles.add_user_role(self.identity_client.user_id,
member_role.id,
self.identity_client.tenant_id)
except keystoneclient.exceptions.Conflict:
pass
endpoint_type = CONF.object_storage.endpoint_type
os_options = {'endpoint_type': endpoint_type}
return swiftclient.Connection(auth_url, credentials.username,
credentials.password,
tenant_name=credentials.tenant_name,
auth_version='2',
os_options=os_options)
def _get_orchestration_client(self, credentials):
if not CONF.service_available.heat:
return None
import heatclient.client
keystone = self._get_identity_client(credentials)
region = CONF.identity.region
endpoint_type = CONF.orchestration.endpoint_type
token = keystone.auth_token
service_type = CONF.orchestration.catalog_type
try:
endpoint = keystone.service_catalog.url_for(
attr='region',
filter_value=region,
service_type=service_type,
endpoint_type=endpoint_type)
except keystoneclient.exceptions.EndpointNotFound:
return None
else:
return heatclient.client.Client(self.HEATCLIENT_VERSION,
endpoint,
token=token,
username=credentials.username,
password=credentials.password)
def _get_identity_client(self, credentials):
# This identity client is not intended to check the security
# of the identity service, so use admin credentials by default.
auth_url = CONF.identity.uri
dscv = CONF.identity.disable_ssl_certificate_validation
return keystoneclient.v2_0.client.Client(
username=credentials.username,
password=credentials.password,
tenant_name=credentials.tenant_name,
auth_url=auth_url,
insecure=dscv)
def _get_baremetal_client(self):
# ironic client is currently intended to by used by admin users
if not CONF.service_available.ironic:
return None
import ironicclient.client
roles = self._get_roles()
if CONF.identity.admin_role not in roles:
return None
auth_url = CONF.identity.uri
api_version = self.IRONICCLIENT_VERSION
insecure = CONF.identity.disable_ssl_certificate_validation
service_type = CONF.baremetal.catalog_type
endpoint_type = CONF.baremetal.endpoint_type
creds = {
'os_username': self.credentials.username,
'os_password': self.credentials.password,
'os_tenant_name': self.credentials.tenant_name
}
try:
return ironicclient.client.get_client(
api_version=api_version,
os_auth_url=auth_url,
insecure=insecure,
os_service_type=service_type,
os_endpoint_type=endpoint_type,
**creds)
except keystoneclient.exceptions.EndpointNotFound:
return None
def _get_network_client(self):
# The intended configuration is for the network client to have
# admin privileges and indicate for whom resources are being
# created via a 'tenant_id' parameter. This will often be
# preferable to authenticating as a specific user because
# working with certain resources (public routers and networks)
# often requires admin privileges anyway.
if not CONF.service_available.neutron:
return None
import neutronclient.v2_0.client
credentials = auth.get_default_credentials('identity_admin')
auth_url = CONF.identity.uri
dscv = CONF.identity.disable_ssl_certificate_validation
endpoint_type = CONF.network.endpoint_type
return neutronclient.v2_0.client.Client(
username=credentials.username,
password=credentials.password,
tenant_name=credentials.tenant_name,
endpoint_type=endpoint_type,
auth_url=auth_url,
insecure=dscv)
def _get_data_processing_client(self, credentials):
if not CONF.service_available.sahara:
# Sahara isn't available
return None
import saharaclient.client
endpoint_type = CONF.data_processing.endpoint_type
catalog_type = CONF.data_processing.catalog_type
auth_url = CONF.identity.uri
client = saharaclient.client.Client(
self.SAHARACLIENT_VERSION,
credentials.username,
credentials.password,
project_name=credentials.tenant_name,
endpoint_type=endpoint_type,
service_type=catalog_type,
auth_url=auth_url)
return client
def _get_ceilometer_client(self, credentials):
if not CONF.service_available.ceilometer:
return None
import ceilometerclient.client
keystone = self._get_identity_client(credentials)
region = CONF.identity.region
endpoint_type = CONF.telemetry.endpoint_type
service_type = CONF.telemetry.catalog_type
auth_url = CONF.identity.uri
try:
keystone.service_catalog.url_for(
attr='region',
filter_value=region,
service_type=service_type,
endpoint_type=endpoint_type)
except keystoneclient.exceptions.EndpointNotFound:
return None
else:
return ceilometerclient.client.get_client(
self.CEILOMETERCLIENT_VERSION,
os_username=credentials.username,
os_password=credentials.password,
os_tenant_name=credentials.tenant_name,
os_auth_url=auth_url,
os_service_type=service_type,
os_endpoint_type=endpoint_type)

View File

@ -106,20 +106,6 @@ def service_tags_not_in_module_path(physical_line, filename):
"T107: service tag should not be in path")
def no_official_client_manager_in_api_tests(physical_line, filename):
"""Check that the OfficialClientManager isn't used in the api tests
The api tests should not use the official clients.
T108: Can not use OfficialClientManager in the API tests
"""
if 'tempest/api' in filename:
if 'OfficialClientManager' in physical_line:
return (physical_line.find('OfficialClientManager'),
'T108: OfficialClientManager can not be used in the api '
'tests')
def no_mutable_default_args(logical_line):
"""Check that mutable object isn't used as default argument
@ -136,5 +122,4 @@ def factory(register):
register(no_setupclass_for_unit_tests)
register(no_vi_headers)
register(service_tags_not_in_module_path)
register(no_official_client_manager_in_api_tests)
register(no_mutable_default_args)

View File

@ -47,15 +47,7 @@ LOG_cinder_client.addHandler(log.NullHandler())
class ScenarioTest(tempest.test.BaseTestCase):
"""Replaces the OfficialClientTest base class.
Uses tempest own clients as opposed to OfficialClients.
Common differences:
- replace resource.attribute with resource['attribute']
- replace resouce.delete with delete_callable(resource['id'])
- replace local waiters with common / rest_client waiters
"""
"""Base class for scenario tests. Uses tempest own clients. """
@classmethod
def setUpClass(cls):

View File

@ -100,14 +100,6 @@ class HackingTestCase(base.TestCase):
self.assertFalse(checks.service_tags_not_in_module_path(
"@test.services('compute')", './tempest/api/image/fake_test.py'))
def test_no_official_client_manager_in_api_tests(self):
self.assertTrue(checks.no_official_client_manager_in_api_tests(
"cls.official_client = clients.OfficialClientManager(credentials)",
"tempest/api/compute/base.py"))
self.assertFalse(checks.no_official_client_manager_in_api_tests(
"cls.official_client = clients.OfficialClientManager(credentials)",
"tempest/scenario/fake_test.py"))
def test_no_mutable_default_args(self):
self.assertEqual(1, len(list(checks.no_mutable_default_args(
" def function1(para={}):"))))