Refactor Managers to a common base class

Multiversion auth part3

Refactor client managers to inherit from a common manager.Manager
class. Moves scenario base manager to clients.py.

Partially implements: bp multi-keystone-api-version-tests

Change-Id: Iddacbaa4593b7cb4d32538a5cade814751c180e0
This commit is contained in:
Andrea Frittoli 2014-02-18 09:57:04 +00:00
parent 6fc5a1d65b
commit f9cde7e942
6 changed files with 356 additions and 330 deletions

View File

@ -13,10 +13,20 @@
# License for the specific language governing permissions and limitations
# under the License.
from tempest import auth
# Default client libs
import cinderclient.client
import glanceclient
import heatclient.client
import keystoneclient.exceptions
import keystoneclient.v2_0.client
import neutronclient.v2_0.client
import novaclient.client
import swiftclient
from tempest.common.rest_client import NegativeRestClient
from tempest import config
from tempest import exceptions
from tempest import manager
from tempest.openstack.common import log as logging
from tempest.services.baremetal.v1.client_json import BaremetalClientJSON
from tempest.services import botoclients
@ -169,10 +179,10 @@ CONF = config.CONF
LOG = logging.getLogger(__name__)
class Manager(object):
class Manager(manager.Manager):
"""
Top level manager for OpenStack Compute clients
Top level manager for OpenStack tempest clients
"""
def __init__(self, username=None, password=None, tenant_name=None,
@ -187,153 +197,145 @@ class Manager(object):
:param tenant_name: Override of the tenant name
"""
self.interface = interface
self.auth_version = CONF.identity.auth_version
# FIXME(andreaf) Change Manager __init__ to accept a credentials dict
if username is None or password is None:
# Tenant None is a valid use case
self.credentials = self.get_default_credentials()
else:
self.credentials = dict(username=username, password=password,
tenant_name=tenant_name)
if self.auth_version == 'v3':
self.credentials['domain_name'] = 'Default'
# Setup an auth provider
auth_provider = self.get_auth_provider(self.credentials)
self.client_type = 'tempest'
# super cares for credentials validation
super(Manager, self).__init__(
username=username, password=password, tenant_name=tenant_name)
if self.interface == 'xml':
self.certificates_client = CertificatesClientXML(
auth_provider)
self.servers_client = ServersClientXML(auth_provider)
self.limits_client = LimitsClientXML(auth_provider)
self.images_client = ImagesClientXML(auth_provider)
self.keypairs_client = KeyPairsClientXML(auth_provider)
self.quotas_client = QuotasClientXML(auth_provider)
self.flavors_client = FlavorsClientXML(auth_provider)
self.extensions_client = ExtensionsClientXML(auth_provider)
self.auth_provider)
self.servers_client = ServersClientXML(self.auth_provider)
self.limits_client = LimitsClientXML(self.auth_provider)
self.images_client = ImagesClientXML(self.auth_provider)
self.keypairs_client = KeyPairsClientXML(self.auth_provider)
self.quotas_client = QuotasClientXML(self.auth_provider)
self.flavors_client = FlavorsClientXML(self.auth_provider)
self.extensions_client = ExtensionsClientXML(self.auth_provider)
self.volumes_extensions_client = VolumesExtensionsClientXML(
auth_provider)
self.auth_provider)
self.floating_ips_client = FloatingIPsClientXML(
auth_provider)
self.backups_client = BackupsClientXML(auth_provider)
self.snapshots_client = SnapshotsClientXML(auth_provider)
self.volumes_client = VolumesClientXML(auth_provider)
self.volumes_v2_client = VolumesV2ClientXML(auth_provider)
self.auth_provider)
self.backups_client = BackupsClientXML(self.auth_provider)
self.snapshots_client = SnapshotsClientXML(self.auth_provider)
self.volumes_client = VolumesClientXML(self.auth_provider)
self.volumes_v2_client = VolumesV2ClientXML(self.auth_provider)
self.volume_types_client = VolumeTypesClientXML(
auth_provider)
self.identity_client = IdentityClientXML(auth_provider)
self.auth_provider)
self.identity_client = IdentityClientXML(self.auth_provider)
self.identity_v3_client = IdentityV3ClientXML(
auth_provider)
self.auth_provider)
self.security_groups_client = SecurityGroupsClientXML(
auth_provider)
self.interfaces_client = InterfacesClientXML(auth_provider)
self.endpoints_client = EndPointClientXML(auth_provider)
self.fixed_ips_client = FixedIPsClientXML(auth_provider)
self.auth_provider)
self.interfaces_client = InterfacesClientXML(self.auth_provider)
self.endpoints_client = EndPointClientXML(self.auth_provider)
self.fixed_ips_client = FixedIPsClientXML(self.auth_provider)
self.availability_zone_client = AvailabilityZoneClientXML(
auth_provider)
self.service_client = ServiceClientXML(auth_provider)
self.aggregates_client = AggregatesClientXML(auth_provider)
self.services_client = ServicesClientXML(auth_provider)
self.auth_provider)
self.service_client = ServiceClientXML(self.auth_provider)
self.aggregates_client = AggregatesClientXML(self.auth_provider)
self.services_client = ServicesClientXML(self.auth_provider)
self.tenant_usages_client = TenantUsagesClientXML(
auth_provider)
self.policy_client = PolicyClientXML(auth_provider)
self.hosts_client = HostsClientXML(auth_provider)
self.hypervisor_client = HypervisorClientXML(auth_provider)
self.network_client = NetworkClientXML(auth_provider)
self.auth_provider)
self.policy_client = PolicyClientXML(self.auth_provider)
self.hosts_client = HostsClientXML(self.auth_provider)
self.hypervisor_client = HypervisorClientXML(self.auth_provider)
self.network_client = NetworkClientXML(self.auth_provider)
self.credentials_client = CredentialsClientXML(
auth_provider)
self.auth_provider)
self.instance_usages_audit_log_client = \
InstanceUsagesAuditLogClientXML(auth_provider)
InstanceUsagesAuditLogClientXML(self.auth_provider)
self.volume_hosts_client = VolumeHostsClientXML(
auth_provider)
self.auth_provider)
self.volumes_extension_client = VolumeExtensionClientXML(
auth_provider)
self.auth_provider)
if CONF.service_available.ceilometer:
self.telemetry_client = TelemetryClientXML(
auth_provider)
self.auth_provider)
self.token_client = TokenClientXML()
self.token_v3_client = V3TokenClientXML()
elif self.interface == 'json':
self.certificates_client = CertificatesClientJSON(
auth_provider)
self.auth_provider)
self.certificates_v3_client = CertificatesV3ClientJSON(
auth_provider)
self.baremetal_client = BaremetalClientJSON(auth_provider)
self.servers_client = ServersClientJSON(auth_provider)
self.servers_v3_client = ServersV3ClientJSON(auth_provider)
self.limits_client = LimitsClientJSON(auth_provider)
self.images_client = ImagesClientJSON(auth_provider)
self.auth_provider)
self.baremetal_client = BaremetalClientJSON(self.auth_provider)
self.servers_client = ServersClientJSON(self.auth_provider)
self.servers_v3_client = ServersV3ClientJSON(self.auth_provider)
self.limits_client = LimitsClientJSON(self.auth_provider)
self.images_client = ImagesClientJSON(self.auth_provider)
self.keypairs_v3_client = KeyPairsV3ClientJSON(
auth_provider)
self.keypairs_client = KeyPairsClientJSON(auth_provider)
self.auth_provider)
self.keypairs_client = KeyPairsClientJSON(self.auth_provider)
self.keypairs_v3_client = KeyPairsV3ClientJSON(
auth_provider)
self.quotas_client = QuotasClientJSON(auth_provider)
self.quotas_v3_client = QuotasV3ClientJSON(auth_provider)
self.flavors_client = FlavorsClientJSON(auth_provider)
self.flavors_v3_client = FlavorsV3ClientJSON(auth_provider)
self.auth_provider)
self.quotas_client = QuotasClientJSON(self.auth_provider)
self.quotas_v3_client = QuotasV3ClientJSON(self.auth_provider)
self.flavors_client = FlavorsClientJSON(self.auth_provider)
self.flavors_v3_client = FlavorsV3ClientJSON(self.auth_provider)
self.extensions_v3_client = ExtensionsV3ClientJSON(
auth_provider)
self.auth_provider)
self.extensions_client = ExtensionsClientJSON(
auth_provider)
self.auth_provider)
self.volumes_extensions_client = VolumesExtensionsClientJSON(
auth_provider)
self.auth_provider)
self.floating_ips_client = FloatingIPsClientJSON(
auth_provider)
self.backups_client = BackupsClientJSON(auth_provider)
self.snapshots_client = SnapshotsClientJSON(auth_provider)
self.volumes_client = VolumesClientJSON(auth_provider)
self.volumes_v2_client = VolumesV2ClientJSON(auth_provider)
self.auth_provider)
self.backups_client = BackupsClientJSON(self.auth_provider)
self.snapshots_client = SnapshotsClientJSON(self.auth_provider)
self.volumes_client = VolumesClientJSON(self.auth_provider)
self.volumes_v2_client = VolumesV2ClientJSON(self.auth_provider)
self.volume_types_client = VolumeTypesClientJSON(
auth_provider)
self.identity_client = IdentityClientJSON(auth_provider)
self.auth_provider)
self.identity_client = IdentityClientJSON(self.auth_provider)
self.identity_v3_client = IdentityV3ClientJSON(
auth_provider)
self.auth_provider)
self.security_groups_client = SecurityGroupsClientJSON(
auth_provider)
self.auth_provider)
self.interfaces_v3_client = InterfacesV3ClientJSON(
auth_provider)
self.auth_provider)
self.interfaces_client = InterfacesClientJSON(
auth_provider)
self.endpoints_client = EndPointClientJSON(auth_provider)
self.fixed_ips_client = FixedIPsClientJSON(auth_provider)
self.auth_provider)
self.endpoints_client = EndPointClientJSON(self.auth_provider)
self.fixed_ips_client = FixedIPsClientJSON(self.auth_provider)
self.availability_zone_v3_client = AvailabilityZoneV3ClientJSON(
auth_provider)
self.auth_provider)
self.availability_zone_client = AvailabilityZoneClientJSON(
auth_provider)
self.auth_provider)
self.services_v3_client = ServicesV3ClientJSON(
auth_provider)
self.service_client = ServiceClientJSON(auth_provider)
self.auth_provider)
self.service_client = ServiceClientJSON(self.auth_provider)
self.aggregates_v3_client = AggregatesV3ClientJSON(
auth_provider)
self.auth_provider)
self.aggregates_client = AggregatesClientJSON(
auth_provider)
self.services_client = ServicesClientJSON(auth_provider)
self.auth_provider)
self.services_client = ServicesClientJSON(self.auth_provider)
self.tenant_usages_client = TenantUsagesClientJSON(
auth_provider)
self.version_v3_client = VersionV3ClientJSON(auth_provider)
self.policy_client = PolicyClientJSON(auth_provider)
self.hosts_client = HostsClientJSON(auth_provider)
self.auth_provider)
self.version_v3_client = VersionV3ClientJSON(self.auth_provider)
self.policy_client = PolicyClientJSON(self.auth_provider)
self.hosts_client = HostsClientJSON(self.auth_provider)
self.hypervisor_v3_client = HypervisorV3ClientJSON(
auth_provider)
self.auth_provider)
self.hypervisor_client = HypervisorClientJSON(
auth_provider)
self.network_client = NetworkClientJSON(auth_provider)
self.auth_provider)
self.network_client = NetworkClientJSON(self.auth_provider)
self.credentials_client = CredentialsClientJSON(
auth_provider)
self.auth_provider)
self.instance_usages_audit_log_client = \
InstanceUsagesAuditLogClientJSON(auth_provider)
InstanceUsagesAuditLogClientJSON(self.auth_provider)
self.volume_hosts_client = VolumeHostsClientJSON(
auth_provider)
self.auth_provider)
self.volumes_extension_client = VolumeExtensionClientJSON(
auth_provider)
self.hosts_v3_client = HostsV3ClientJSON(auth_provider)
self.auth_provider)
self.hosts_v3_client = HostsV3ClientJSON(self.auth_provider)
if CONF.service_available.ceilometer:
self.telemetry_client = TelemetryClientJSON(
auth_provider)
self.auth_provider)
self.token_client = TokenClientJSON()
self.token_v3_client = V3TokenClientJSON()
self.negative_client = NegativeRestClient(auth_provider)
self.negative_client = NegativeRestClient(self.auth_provider)
self.negative_client.service = service
else:
@ -347,47 +349,22 @@ class Manager(object):
self.credentials.get('tenant_name'))
# common clients
self.account_client = AccountClient(auth_provider)
self.account_client = AccountClient(self.auth_provider)
if CONF.service_available.glance:
self.image_client = ImageClientJSON(auth_provider)
self.image_client_v2 = ImageClientV2JSON(auth_provider)
self.container_client = ContainerClient(auth_provider)
self.object_client = ObjectClient(auth_provider)
self.image_client = ImageClientJSON(self.auth_provider)
self.image_client_v2 = ImageClientV2JSON(self.auth_provider)
self.container_client = ContainerClient(self.auth_provider)
self.object_client = ObjectClient(self.auth_provider)
self.orchestration_client = OrchestrationClient(
auth_provider)
self.auth_provider)
self.ec2api_client = botoclients.APIClientEC2(*ec2_client_args)
self.s3_client = botoclients.ObjectClientS3(*ec2_client_args)
self.custom_object_client = ObjectClientCustomizedHeader(
auth_provider)
self.auth_provider)
self.custom_account_client = \
AccountClientCustomizedHeader(auth_provider)
AccountClientCustomizedHeader(self.auth_provider)
self.data_processing_client = DataProcessingClient(
auth_provider)
@classmethod
def get_auth_provider_class(cls, auth_version):
if auth_version == 'v2':
return auth.KeystoneV2AuthProvider
else:
return auth.KeystoneV3AuthProvider
def get_default_credentials(self):
return dict(
username=CONF.identity.username,
password=CONF.identity.password,
tenant_name=CONF.identity.tenant_name
)
def get_auth_provider(self, credentials=None):
auth_params = dict(client_type='tempest',
interface=self.interface)
auth_provider_class = self.get_auth_provider_class(self.auth_version)
# If invalid / incomplete credentials are provided, use default ones
if credentials is None or \
not auth_provider_class.check_credentials(credentials):
credentials = self.credentials
auth_params['credentials'] = credentials
return auth_provider_class(**auth_params)
self.auth_provider)
class AltManager(Manager):
@ -452,3 +429,187 @@ class OrchestrationManager(Manager):
CONF.identity.tenant_name,
interface=interface,
service=service)
class OfficialClientManager(manager.Manager):
"""
Manager that provides access to the official python clients for
calling various OpenStack APIs.
"""
NOVACLIENT_VERSION = '2'
CINDERCLIENT_VERSION = '1'
HEATCLIENT_VERSION = '1'
def __init__(self, username, password, tenant_name):
# FIXME(andreaf) Auth provider for client_type 'official' is
# not implemented yet, setting to 'tempest' for now.
self.client_type = 'tempest'
self.interface = None
# super cares for credentials validation
super(OfficialClientManager, self).__init__(
username=username, password=password, tenant_name=tenant_name)
self.compute_client = self._get_compute_client(username,
password,
tenant_name)
self.identity_client = self._get_identity_client(username,
password,
tenant_name)
self.image_client = self._get_image_client()
self.network_client = self._get_network_client()
self.volume_client = self._get_volume_client(username,
password,
tenant_name)
self.object_storage_client = self._get_object_storage_client(
username,
password,
tenant_name)
self.orchestration_client = self._get_orchestration_client(
username,
password,
tenant_name)
def _get_compute_client(self, username, password, tenant_name):
# Novaclient will not execute operations for anyone but the
# identified user, so a new client needs to be created for
# each user that operations need to be performed for.
self._validate_credentials(username, password, tenant_name)
auth_url = CONF.identity.uri
dscv = CONF.identity.disable_ssl_certificate_validation
region = CONF.identity.region
client_args = (username, password, tenant_name, auth_url)
# Create our default Nova client to use in testing
service_type = CONF.compute.catalog_type
endpoint_type = CONF.compute.endpoint_type
return novaclient.client.Client(self.NOVACLIENT_VERSION,
*client_args,
service_type=service_type,
endpoint_type=endpoint_type,
region_name=region,
no_cache=True,
insecure=dscv,
http_log_debug=True)
def _get_image_client(self):
token = self.identity_client.auth_token
region = CONF.identity.region
endpoint_type = CONF.image.endpoint_type
endpoint = self.identity_client.service_catalog.url_for(
attr='region', filter_value=region,
service_type=CONF.image.catalog_type, endpoint_type=endpoint_type)
dscv = CONF.identity.disable_ssl_certificate_validation
return glanceclient.Client('1', endpoint=endpoint, token=token,
insecure=dscv)
def _get_volume_client(self, username, password, tenant_name):
auth_url = CONF.identity.uri
region = CONF.identity.region
endpoint_type = CONF.volume.endpoint_type
return cinderclient.client.Client(self.CINDERCLIENT_VERSION,
username,
password,
tenant_name,
auth_url,
region_name=region,
endpoint_type=endpoint_type,
http_log_debug=True)
def _get_object_storage_client(self, username, password, tenant_name):
auth_url = CONF.identity.uri
# add current tenant to swift operator role group.
keystone_admin = self._get_identity_client(
CONF.identity.admin_username,
CONF.identity.admin_password,
CONF.identity.admin_tenant_name)
# enable test user to operate swift by adding operator role to him.
roles = keystone_admin.roles.list()
operator_role = CONF.object_storage.operator_role
member_role = [role for role in roles if role.name == operator_role][0]
# NOTE(maurosr): This is surrounded in the try-except block cause
# neutron tests doesn't have tenant isolation.
try:
keystone_admin.roles.add_user_role(self.identity_client.user_id,
member_role.id,
self.identity_client.tenant_id)
except keystoneclient.exceptions.Conflict:
pass
endpoint_type = CONF.object_storage.endpoint_type
os_options = {'endpoint_type': endpoint_type}
return swiftclient.Connection(auth_url, username, password,
tenant_name=tenant_name,
auth_version='2',
os_options=os_options)
def _get_orchestration_client(self, username=None, password=None,
tenant_name=None):
if not username:
username = CONF.identity.admin_username
if not password:
password = CONF.identity.admin_password
if not tenant_name:
tenant_name = CONF.identity.tenant_name
self._validate_credentials(username, password, tenant_name)
keystone = self._get_identity_client(username, password, tenant_name)
region = CONF.identity.region
endpoint_type = CONF.orchestration.endpoint_type
token = keystone.auth_token
service_type = CONF.orchestration.catalog_type
try:
endpoint = keystone.service_catalog.url_for(
attr='region',
filter_value=region,
service_type=service_type,
endpoint_type=endpoint_type)
except keystoneclient.exceptions.EndpointNotFound:
return None
else:
return heatclient.client.Client(self.HEATCLIENT_VERSION,
endpoint,
token=token,
username=username,
password=password)
def _get_identity_client(self, username, password, tenant_name):
# This identity client is not intended to check the security
# of the identity service, so use admin credentials by default.
self._validate_credentials(username, password, tenant_name)
auth_url = CONF.identity.uri
dscv = CONF.identity.disable_ssl_certificate_validation
return keystoneclient.v2_0.client.Client(username=username,
password=password,
tenant_name=tenant_name,
auth_url=auth_url,
insecure=dscv)
def _get_network_client(self):
# The intended configuration is for the network client to have
# admin privileges and indicate for whom resources are being
# created via a 'tenant_id' parameter. This will often be
# preferable to authenticating as a specific user because
# working with certain resources (public routers and networks)
# often requires admin privileges anyway.
username = CONF.identity.admin_username
password = CONF.identity.admin_password
tenant_name = CONF.identity.admin_tenant_name
self._validate_credentials(username, password, tenant_name)
auth_url = CONF.identity.uri
dscv = CONF.identity.disable_ssl_certificate_validation
endpoint_type = CONF.network.endpoint_type
return neutronclient.v2_0.client.Client(username=username,
password=password,
tenant_name=tenant_name,
endpoint_type=endpoint_type,
auth_url=auth_url,
insecure=dscv)

View File

@ -12,9 +12,9 @@
# License for the specific language governing permissions and limitations
# under the License.
from tempest import clients
from tempest.common.utils import misc
from tempest import config
from tempest.scenario import manager
import json
import re
@ -35,7 +35,7 @@ class ImageUtils(object):
self.non_ssh_image_pattern = \
CONF.input_scenario.non_ssh_image_regex
# Setup clients
ocm = manager.OfficialClientManager(CONF.identity.username,
ocm = clients.OfficialClientManager(CONF.identity.username,
CONF.identity.password,
CONF.identity.tenant_name)
self.client = ocm.compute_client
@ -95,7 +95,7 @@ class InputScenarioUtils(object):
digit=string.digits)
def __init__(self):
ocm = manager.OfficialClientManager(CONF.identity.username,
ocm = clients.OfficialClientManager(CONF.identity.username,
CONF.identity.password,
CONF.identity.tenant_name)
self.client = ocm.compute_client

View File

@ -13,8 +13,12 @@
# License for the specific language governing permissions and limitations
# under the License.
from tempest import auth
from tempest import config
from tempest import exceptions
CONF = config.CONF
class Manager(object):
@ -25,7 +29,27 @@ class Manager(object):
and a client object for a test case to use in performing actions.
"""
def __init__(self):
def __init__(self, username=None, password=None, tenant_name=None):
"""
We allow overriding of the credentials used within the various
client classes managed by the Manager object. Left as None, the
standard username/password/tenant_name[/domain_name] is used.
:param credentials: Override of the credentials
"""
self.auth_version = CONF.identity.auth_version
# FIXME(andreaf) Change Manager __init__ to accept a credentials dict
if username is None or password is None:
# Tenant None is a valid use case
self.credentials = self.get_default_credentials()
else:
self.credentials = dict(username=username, password=password,
tenant_name=tenant_name)
if self.auth_version == 'v3':
self.credentials['domain_name'] = 'Default'
# Creates an auth provider for the credentials
self.auth_provider = self.get_auth_provider(self.credentials)
# FIXME(andreaf) unused
self.client_attr_names = []
# we do this everywhere, have it be part of the super class
@ -36,3 +60,28 @@ class Manager(object):
"tenant_name: %(t)s" %
{'u': username, 'p': password, 't': tenant_name})
raise exceptions.InvalidConfiguration(msg)
@classmethod
def get_auth_provider_class(cls, auth_version):
if auth_version == 'v2':
return auth.KeystoneV2AuthProvider
else:
return auth.KeystoneV3AuthProvider
def get_default_credentials(self):
return dict(
username=CONF.identity.username,
password=CONF.identity.password,
tenant_name=CONF.identity.tenant_name
)
def get_auth_provider(self, credentials=None):
auth_params = dict(client_type=getattr(self, 'client_type', None),
interface=getattr(self, 'interface', None))
auth_provider_class = self.get_auth_provider_class(self.auth_version)
# If invalid / incomplete credentials are provided, use default ones
if credentials is None or \
not auth_provider_class.check_credentials(credentials):
credentials = self.credentials
auth_params['credentials'] = credentials
return auth_provider_class(**auth_params)

View File

@ -18,26 +18,17 @@ import logging
import os
import subprocess
# Default client libs
import cinderclient.client
import glanceclient
import heatclient.client
import keystoneclient.exceptions
import keystoneclient.v2_0.client
import netaddr
from neutronclient.common import exceptions as exc
import neutronclient.v2_0.client
import novaclient.client
from novaclient import exceptions as nova_exceptions
import swiftclient
from tempest.api.network import common as net_common
from tempest import clients
from tempest.common import isolated_creds
from tempest.common.utils import data_utils
from tempest.common.utils.linux.remote_client import RemoteClient
from tempest import config
from tempest import exceptions
import tempest.manager
from tempest.openstack.common import log
import tempest.test
@ -53,184 +44,6 @@ LOG_cinder_client = logging.getLogger('cinderclient.client')
LOG_cinder_client.addHandler(log.NullHandler())
class OfficialClientManager(tempest.manager.Manager):
"""
Manager that provides access to the official python clients for
calling various OpenStack APIs.
"""
NOVACLIENT_VERSION = '2'
CINDERCLIENT_VERSION = '1'
HEATCLIENT_VERSION = '1'
def __init__(self, username, password, tenant_name):
super(OfficialClientManager, self).__init__()
self.compute_client = self._get_compute_client(username,
password,
tenant_name)
self.identity_client = self._get_identity_client(username,
password,
tenant_name)
self.image_client = self._get_image_client()
self.network_client = self._get_network_client()
self.volume_client = self._get_volume_client(username,
password,
tenant_name)
self.object_storage_client = self._get_object_storage_client(
username,
password,
tenant_name)
self.orchestration_client = self._get_orchestration_client(
username,
password,
tenant_name)
def _get_compute_client(self, username, password, tenant_name):
# Novaclient will not execute operations for anyone but the
# identified user, so a new client needs to be created for
# each user that operations need to be performed for.
self._validate_credentials(username, password, tenant_name)
auth_url = CONF.identity.uri
dscv = CONF.identity.disable_ssl_certificate_validation
region = CONF.identity.region
client_args = (username, password, tenant_name, auth_url)
# Create our default Nova client to use in testing
service_type = CONF.compute.catalog_type
endpoint_type = CONF.compute.endpoint_type
return novaclient.client.Client(self.NOVACLIENT_VERSION,
*client_args,
service_type=service_type,
endpoint_type=endpoint_type,
region_name=region,
no_cache=True,
insecure=dscv,
http_log_debug=True)
def _get_image_client(self):
token = self.identity_client.auth_token
region = CONF.identity.region
endpoint_type = CONF.image.endpoint_type
endpoint = self.identity_client.service_catalog.url_for(
attr='region', filter_value=region,
service_type=CONF.image.catalog_type, endpoint_type=endpoint_type)
dscv = CONF.identity.disable_ssl_certificate_validation
return glanceclient.Client('1', endpoint=endpoint, token=token,
insecure=dscv)
def _get_volume_client(self, username, password, tenant_name):
auth_url = CONF.identity.uri
region = CONF.identity.region
endpoint_type = CONF.volume.endpoint_type
return cinderclient.client.Client(self.CINDERCLIENT_VERSION,
username,
password,
tenant_name,
auth_url,
region_name=region,
endpoint_type=endpoint_type,
http_log_debug=True)
def _get_object_storage_client(self, username, password, tenant_name):
auth_url = CONF.identity.uri
# add current tenant to swift operator role group.
keystone_admin = self._get_identity_client(
CONF.identity.admin_username,
CONF.identity.admin_password,
CONF.identity.admin_tenant_name)
# enable test user to operate swift by adding operator role to him.
roles = keystone_admin.roles.list()
operator_role = CONF.object_storage.operator_role
member_role = [role for role in roles if role.name == operator_role][0]
# NOTE(maurosr): This is surrounded in the try-except block cause
# neutron tests doesn't have tenant isolation.
try:
keystone_admin.roles.add_user_role(self.identity_client.user_id,
member_role.id,
self.identity_client.tenant_id)
except keystoneclient.exceptions.Conflict:
pass
endpoint_type = CONF.object_storage.endpoint_type
os_options = {'endpoint_type': endpoint_type}
return swiftclient.Connection(auth_url, username, password,
tenant_name=tenant_name,
auth_version='2',
os_options=os_options)
def _get_orchestration_client(self, username=None, password=None,
tenant_name=None):
if not username:
username = CONF.identity.admin_username
if not password:
password = CONF.identity.admin_password
if not tenant_name:
tenant_name = CONF.identity.tenant_name
self._validate_credentials(username, password, tenant_name)
keystone = self._get_identity_client(username, password, tenant_name)
region = CONF.identity.region
endpoint_type = CONF.orchestration.endpoint_type
token = keystone.auth_token
service_type = CONF.orchestration.catalog_type
try:
endpoint = keystone.service_catalog.url_for(
attr='region',
filter_value=region,
service_type=service_type,
endpoint_type=endpoint_type)
except keystoneclient.exceptions.EndpointNotFound:
return None
else:
return heatclient.client.Client(self.HEATCLIENT_VERSION,
endpoint,
token=token,
username=username,
password=password)
def _get_identity_client(self, username, password, tenant_name):
# This identity client is not intended to check the security
# of the identity service, so use admin credentials by default.
self._validate_credentials(username, password, tenant_name)
auth_url = CONF.identity.uri
dscv = CONF.identity.disable_ssl_certificate_validation
return keystoneclient.v2_0.client.Client(username=username,
password=password,
tenant_name=tenant_name,
auth_url=auth_url,
insecure=dscv)
def _get_network_client(self):
# The intended configuration is for the network client to have
# admin privileges and indicate for whom resources are being
# created via a 'tenant_id' parameter. This will often be
# preferable to authenticating as a specific user because
# working with certain resources (public routers and networks)
# often requires admin privileges anyway.
username = CONF.identity.admin_username
password = CONF.identity.admin_password
tenant_name = CONF.identity.admin_tenant_name
self._validate_credentials(username, password, tenant_name)
auth_url = CONF.identity.uri
dscv = CONF.identity.disable_ssl_certificate_validation
endpoint_type = CONF.network.endpoint_type
return neutronclient.v2_0.client.Client(username=username,
password=password,
tenant_name=tenant_name,
endpoint_type=endpoint_type,
auth_url=auth_url,
insecure=dscv)
class OfficialClientTest(tempest.test.BaseTestCase):
"""
Official Client test base class for scenario testing.
@ -253,7 +66,8 @@ class OfficialClientTest(tempest.test.BaseTestCase):
username, password, tenant_name = cls.credentials()
cls.manager = OfficialClientManager(username, password, tenant_name)
cls.manager = clients.OfficialClientManager(
username, password, tenant_name)
cls.compute_client = cls.manager.compute_client
cls.image_client = cls.manager.image_client
cls.identity_client = cls.manager.identity_client

View File

@ -13,13 +13,13 @@
# License for the specific language governing permissions and limitations
# under the License.
from tempest import clients
from tempest.common import debug
from tempest.common.utils import data_utils
from tempest import config
from tempest import exceptions
from tempest.openstack.common import log as logging
from tempest.scenario import manager
from tempest.scenario.manager import OfficialClientManager
from tempest.test import attr
from tempest.test import call_until_true
from tempest.test import services
@ -102,7 +102,7 @@ class TestSecurityGroupsBasicOps(manager.NetworkScenarioTest):
"""
def __init__(self, tenant_id, tenant_user, tenant_pass, tenant_name):
self.manager = OfficialClientManager(
self.manager = clients.OfficialClientManager(
tenant_user,
tenant_pass,
tenant_name

View File

@ -35,6 +35,7 @@ class BotoClientBase(object):
def __init__(self, username=None, password=None,
auth_url=None, tenant_name=None,
*args, **kwargs):
# FIXME(andreaf) replace credentials and auth_url with auth_provider
self.connection_timeout = str(CONF.boto.http_socket_timeout)
self.num_retries = str(CONF.boto.num_retries)
@ -45,6 +46,7 @@ class BotoClientBase(object):
"tenant_name": tenant_name}
def _keystone_aws_get(self):
# FIXME(andreaf) Move EC2 credentials to AuthProvider
import keystoneclient.v2_0.client
keystone = keystoneclient.v2_0.client.Client(**self.ks_cred)