From b8a52287e72fdcedab0942879eea42bc541b40fd Mon Sep 17 00:00:00 2001 From: andreaf Date: Thu, 19 Mar 2015 22:21:54 +0000 Subject: [PATCH] Drop auth and corresponding unit tests auth has been migrated to tempest-lib. Fix all auth imports to use tempest-lib. Drop auth and related unit tests. Use token client from tempest-lib as well. Depends-on: Ie6435b4f3a367b0a8cec68f21c0b4f5f61d6b688 Change-Id: I8c2772d9fb42d352f4a1d3e74e20ce6e8f483559 --- requirements.txt | 2 +- .../admin/v3/test_default_project_id.py | 2 +- tempest/auth.py | 659 ------------------ tempest/clients.py | 4 +- tempest/cmd/javelin.py | 6 +- tempest/common/cred_provider.py | 2 +- tempest/manager.py | 3 +- tempest/tests/common/test_accounts.py | 4 +- tempest/tests/common/test_cred_provider.py | 40 +- tempest/tests/fake_credentials.py | 63 -- tempest/tests/test_auth.py | 411 ----------- tempest/tests/test_credentials.py | 180 ----- tempest/tests/test_tenant_isolation.py | 2 +- 13 files changed, 46 insertions(+), 1332 deletions(-) delete mode 100644 tempest/auth.py delete mode 100644 tempest/tests/fake_credentials.py delete mode 100644 tempest/tests/test_auth.py delete mode 100644 tempest/tests/test_credentials.py diff --git a/requirements.txt b/requirements.txt index 0d7fc0dc84..174c7c80e5 100644 --- a/requirements.txt +++ b/requirements.txt @@ -23,4 +23,4 @@ six>=1.9.0 iso8601>=0.1.9 fixtures>=0.3.14 testscenarios>=0.4 -tempest-lib>=0.4.0 +tempest-lib>=0.5.0 diff --git a/tempest/api/identity/admin/v3/test_default_project_id.py b/tempest/api/identity/admin/v3/test_default_project_id.py index 9841cc8546..f9495947c8 100644 --- a/tempest/api/identity/admin/v3/test_default_project_id.py +++ b/tempest/api/identity/admin/v3/test_default_project_id.py @@ -10,10 +10,10 @@ # License for the specific language governing permissions and limitations # under the License. +from tempest_lib import auth from tempest_lib.common.utils import data_utils from tempest.api.identity import base -from tempest import auth from tempest import clients from tempest import config from tempest import manager diff --git a/tempest/auth.py b/tempest/auth.py deleted file mode 100644 index 113ad696aa..0000000000 --- a/tempest/auth.py +++ /dev/null @@ -1,659 +0,0 @@ -# Copyright 2014 Hewlett-Packard Development Company, L.P. -# All Rights Reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. - -import abc -import copy -import datetime -import exceptions -import re -import urlparse - -from oslo_log import log as logging -import six - -from tempest.services.identity.v2.json import token_client as json_v2id -from tempest.services.identity.v3.json import token_client as json_v3id - - -LOG = logging.getLogger(__name__) - - -@six.add_metaclass(abc.ABCMeta) -class AuthProvider(object): - """ - Provide authentication - """ - - def __init__(self, credentials): - """ - :param credentials: credentials for authentication - """ - if self.check_credentials(credentials): - self.credentials = credentials - else: - raise TypeError("Invalid credentials") - self.cache = None - self.alt_auth_data = None - self.alt_part = None - - def __str__(self): - return "Creds :{creds}, cached auth data: {cache}".format( - creds=self.credentials, cache=self.cache) - - @abc.abstractmethod - def _decorate_request(self, filters, method, url, headers=None, body=None, - auth_data=None): - """ - Decorate request with authentication data - """ - return - - @abc.abstractmethod - def _get_auth(self): - return - - @abc.abstractmethod - def _fill_credentials(self, auth_data_body): - return - - def fill_credentials(self): - """ - Fill credentials object with data from auth - """ - auth_data = self.get_auth() - self._fill_credentials(auth_data[1]) - return self.credentials - - @classmethod - def check_credentials(cls, credentials): - """ - Verify credentials are valid. - """ - return isinstance(credentials, Credentials) and credentials.is_valid() - - @property - def auth_data(self): - return self.get_auth() - - @auth_data.deleter - def auth_data(self): - self.clear_auth() - - def get_auth(self): - """ - Returns auth from cache if available, else auth first - """ - if self.cache is None or self.is_expired(self.cache): - self.set_auth() - return self.cache - - def set_auth(self): - """ - Forces setting auth, ignores cache if it exists. - Refills credentials - """ - self.cache = self._get_auth() - self._fill_credentials(self.cache[1]) - - def clear_auth(self): - """ - Can be called to clear the access cache so that next request - will fetch a new token and base_url. - """ - self.cache = None - self.credentials.reset() - - @abc.abstractmethod - def is_expired(self, auth_data): - return - - def auth_request(self, method, url, headers=None, body=None, filters=None): - """ - Obtains auth data and decorates a request with that. - :param method: HTTP method of the request - :param url: relative URL of the request (path) - :param headers: HTTP headers of the request - :param body: HTTP body in case of POST / PUT - :param filters: select a base URL out of the catalog - :returns a Tuple (url, headers, body) - """ - orig_req = dict(url=url, headers=headers, body=body) - - auth_url, auth_headers, auth_body = self._decorate_request( - filters, method, url, headers, body) - auth_req = dict(url=auth_url, headers=auth_headers, body=auth_body) - - # Overwrite part if the request if it has been requested - if self.alt_part is not None: - if self.alt_auth_data is not None: - alt_url, alt_headers, alt_body = self._decorate_request( - filters, method, url, headers, body, - auth_data=self.alt_auth_data) - alt_auth_req = dict(url=alt_url, headers=alt_headers, - body=alt_body) - auth_req[self.alt_part] = alt_auth_req[self.alt_part] - - else: - # If alt auth data is None, skip auth in the requested part - auth_req[self.alt_part] = orig_req[self.alt_part] - - # Next auth request will be normal, unless otherwise requested - self.reset_alt_auth_data() - - return auth_req['url'], auth_req['headers'], auth_req['body'] - - def reset_alt_auth_data(self): - """ - Configure auth provider to provide valid authentication data - """ - self.alt_part = None - self.alt_auth_data = None - - def set_alt_auth_data(self, request_part, auth_data): - """ - Configure auth provider to provide alt authentication data - on a part of the *next* auth_request. If credentials are None, - set invalid data. - :param request_part: request part to contain invalid auth: url, - headers, body - :param auth_data: alternative auth_data from which to get the - invalid data to be injected - """ - self.alt_part = request_part - self.alt_auth_data = auth_data - - @abc.abstractmethod - def base_url(self, filters, auth_data=None): - """ - Extracts the base_url based on provided filters - """ - return - - -class KeystoneAuthProvider(AuthProvider): - - token_expiry_threshold = datetime.timedelta(seconds=60) - - def __init__(self, credentials, auth_url, - disable_ssl_certificate_validation=None, - ca_certs=None, trace_requests=None): - super(KeystoneAuthProvider, self).__init__(credentials) - self.dsvm = disable_ssl_certificate_validation - self.ca_certs = ca_certs - self.trace_requests = trace_requests - self.auth_client = self._auth_client(auth_url) - - def _decorate_request(self, filters, method, url, headers=None, body=None, - auth_data=None): - if auth_data is None: - auth_data = self.auth_data - token, _ = auth_data - base_url = self.base_url(filters=filters, auth_data=auth_data) - # build authenticated request - # returns new request, it does not touch the original values - _headers = copy.deepcopy(headers) if headers is not None else {} - _headers['X-Auth-Token'] = str(token) - if url is None or url == "": - _url = base_url - else: - # Join base URL and url, and remove multiple contiguous slashes - _url = "/".join([base_url, url]) - parts = [x for x in urlparse.urlparse(_url)] - parts[2] = re.sub("/{2,}", "/", parts[2]) - _url = urlparse.urlunparse(parts) - # no change to method or body - return str(_url), _headers, body - - @abc.abstractmethod - def _auth_client(self): - return - - @abc.abstractmethod - def _auth_params(self): - return - - def _get_auth(self): - # Bypasses the cache - auth_func = getattr(self.auth_client, 'get_token') - auth_params = self._auth_params() - - # returns token, auth_data - token, auth_data = auth_func(**auth_params) - return token, auth_data - - def get_token(self): - return self.auth_data[0] - - -class KeystoneV2AuthProvider(KeystoneAuthProvider): - - EXPIRY_DATE_FORMAT = '%Y-%m-%dT%H:%M:%SZ' - - def _auth_client(self, auth_url): - return json_v2id.TokenClientJSON( - auth_url, disable_ssl_certificate_validation=self.dsvm, - ca_certs=self.ca_certs, trace_requests=self.trace_requests) - - def _auth_params(self): - return dict( - user=self.credentials.username, - password=self.credentials.password, - tenant=self.credentials.tenant_name, - auth_data=True) - - def _fill_credentials(self, auth_data_body): - tenant = auth_data_body['token']['tenant'] - user = auth_data_body['user'] - if self.credentials.tenant_name is None: - self.credentials.tenant_name = tenant['name'] - if self.credentials.tenant_id is None: - self.credentials.tenant_id = tenant['id'] - if self.credentials.username is None: - self.credentials.username = user['name'] - if self.credentials.user_id is None: - self.credentials.user_id = user['id'] - - def base_url(self, filters, auth_data=None): - """ - Filters can be: - - service: compute, image, etc - - region: the service region - - endpoint_type: adminURL, publicURL, internalURL - - api_version: replace catalog version with this - - skip_path: take just the base URL - """ - if auth_data is None: - auth_data = self.auth_data - token, _auth_data = auth_data - service = filters.get('service') - region = filters.get('region') - endpoint_type = filters.get('endpoint_type', 'publicURL') - - if service is None: - raise exceptions.EndpointNotFound("No service provided") - - _base_url = None - for ep in _auth_data['serviceCatalog']: - if ep["type"] == service: - for _ep in ep['endpoints']: - if region is not None and _ep['region'] == region: - _base_url = _ep.get(endpoint_type) - if not _base_url: - # No region matching, use the first - _base_url = ep['endpoints'][0].get(endpoint_type) - break - if _base_url is None: - raise exceptions.EndpointNotFound(service) - - parts = urlparse.urlparse(_base_url) - if filters.get('api_version', None) is not None: - path = "/" + filters['api_version'] - noversion_path = "/".join(parts.path.split("/")[2:]) - if noversion_path != "": - path += "/" + noversion_path - _base_url = _base_url.replace(parts.path, path) - if filters.get('skip_path', None) is not None and parts.path != '': - _base_url = _base_url.replace(parts.path, "/") - - return _base_url - - def is_expired(self, auth_data): - _, access = auth_data - expiry = datetime.datetime.strptime(access['token']['expires'], - self.EXPIRY_DATE_FORMAT) - return expiry - self.token_expiry_threshold <= \ - datetime.datetime.utcnow() - - -class KeystoneV3AuthProvider(KeystoneAuthProvider): - - EXPIRY_DATE_FORMAT = '%Y-%m-%dT%H:%M:%S.%fZ' - - def _auth_client(self, auth_url): - return json_v3id.V3TokenClientJSON( - auth_url, disable_ssl_certificate_validation=self.dsvm, - ca_certs=self.ca_certs, trace_requests=self.trace_requests) - - def _auth_params(self): - return dict( - user_id=self.credentials.user_id, - username=self.credentials.username, - password=self.credentials.password, - project_id=self.credentials.project_id, - project_name=self.credentials.project_name, - user_domain_id=self.credentials.user_domain_id, - user_domain_name=self.credentials.user_domain_name, - project_domain_id=self.credentials.project_domain_id, - project_domain_name=self.credentials.project_domain_name, - domain_id=self.credentials.domain_id, - domain_name=self.credentials.domain_name, - auth_data=True) - - def _fill_credentials(self, auth_data_body): - # project or domain, depending on the scope - project = auth_data_body.get('project', None) - domain = auth_data_body.get('domain', None) - # user is always there - user = auth_data_body['user'] - # Set project fields - if project is not None: - if self.credentials.project_name is None: - self.credentials.project_name = project['name'] - if self.credentials.project_id is None: - self.credentials.project_id = project['id'] - if self.credentials.project_domain_id is None: - self.credentials.project_domain_id = project['domain']['id'] - if self.credentials.project_domain_name is None: - self.credentials.project_domain_name = \ - project['domain']['name'] - # Set domain fields - if domain is not None: - if self.credentials.domain_id is None: - self.credentials.domain_id = domain['id'] - if self.credentials.domain_name is None: - self.credentials.domain_name = domain['name'] - # Set user fields - if self.credentials.username is None: - self.credentials.username = user['name'] - if self.credentials.user_id is None: - self.credentials.user_id = user['id'] - if self.credentials.user_domain_id is None: - self.credentials.user_domain_id = user['domain']['id'] - if self.credentials.user_domain_name is None: - self.credentials.user_domain_name = user['domain']['name'] - - def base_url(self, filters, auth_data=None): - """ - Filters can be: - - service: compute, image, etc - - region: the service region - - endpoint_type: adminURL, publicURL, internalURL - - api_version: replace catalog version with this - - skip_path: take just the base URL - """ - if auth_data is None: - auth_data = self.auth_data - token, _auth_data = auth_data - service = filters.get('service') - region = filters.get('region') - endpoint_type = filters.get('endpoint_type', 'public') - - if service is None: - raise exceptions.EndpointNotFound("No service provided") - - if 'URL' in endpoint_type: - endpoint_type = endpoint_type.replace('URL', '') - _base_url = None - catalog = _auth_data['catalog'] - # Select entries with matching service type - service_catalog = [ep for ep in catalog if ep['type'] == service] - if len(service_catalog) > 0: - service_catalog = service_catalog[0]['endpoints'] - else: - # No matching service - raise exceptions.EndpointNotFound(service) - # Filter by endpoint type (interface) - filtered_catalog = [ep for ep in service_catalog if - ep['interface'] == endpoint_type] - if len(filtered_catalog) == 0: - # No matching type, keep all and try matching by region at least - filtered_catalog = service_catalog - # Filter by region - filtered_catalog = [ep for ep in filtered_catalog if - ep['region'] == region] - if len(filtered_catalog) == 0: - # No matching region, take the first endpoint - filtered_catalog = [service_catalog[0]] - # There should be only one match. If not take the first. - _base_url = filtered_catalog[0].get('url', None) - if _base_url is None: - raise exceptions.EndpointNotFound(service) - - parts = urlparse.urlparse(_base_url) - if filters.get('api_version', None) is not None: - path = "/" + filters['api_version'] - noversion_path = "/".join(parts.path.split("/")[2:]) - if noversion_path != "": - path += "/" + noversion_path - _base_url = _base_url.replace(parts.path, path) - if filters.get('skip_path', None) is not None: - _base_url = _base_url.replace(parts.path, "/") - - return _base_url - - def is_expired(self, auth_data): - _, access = auth_data - expiry = datetime.datetime.strptime(access['expires_at'], - self.EXPIRY_DATE_FORMAT) - return expiry - self.token_expiry_threshold <= \ - datetime.datetime.utcnow() - - -def is_identity_version_supported(identity_version): - return identity_version in IDENTITY_VERSION - - -def get_credentials(auth_url, fill_in=True, identity_version='v2', - disable_ssl_certificate_validation=None, ca_certs=None, - trace_requests=None, **kwargs): - """ - Builds a credentials object based on the configured auth_version - - :param auth_url (string): Full URI of the OpenStack Identity API(Keystone) - which is used to fetch the token from Identity service. - :param fill_in (boolean): obtain a token and fill in all credential - details provided by the identity service. When fill_in is not - specified, credentials are not validated. Validation can be invoked - by invoking ``is_valid()`` - :param identity_version (string): identity API version is used to - select the matching auth provider and credentials class - :param disable_ssl_certificate_validation: whether to enforce SSL - certificate validation in SSL API requests to the auth system - :param ca_certs: CA certificate bundle for validation of certificates - in SSL API requests to the auth system - :param trace_requests: trace in log API requests to the auth system - :param kwargs (dict): Dict of credential key/value pairs - - Examples: - - Returns credentials from the provided parameters: - >>> get_credentials(username='foo', password='bar') - - Returns credentials including IDs: - >>> get_credentials(username='foo', password='bar', fill_in=True) - """ - if not is_identity_version_supported(identity_version): - raise exceptions.InvalidIdentityVersion( - identity_version=identity_version) - - credential_class, auth_provider_class = IDENTITY_VERSION.get( - identity_version) - - creds = credential_class(**kwargs) - # Fill in the credentials fields that were not specified - if fill_in: - dsvm = disable_ssl_certificate_validation - auth_provider = auth_provider_class( - creds, auth_url, disable_ssl_certificate_validation=dsvm, - ca_certs=ca_certs, trace_requests=trace_requests) - creds = auth_provider.fill_credentials() - return creds - - -class Credentials(object): - """ - Set of credentials for accessing OpenStack services - - ATTRIBUTES: list of valid class attributes representing credentials. - """ - - ATTRIBUTES = [] - - def __init__(self, **kwargs): - """ - Enforce the available attributes at init time (only). - Additional attributes can still be set afterwards if tests need - to do so. - """ - self._initial = kwargs - self._apply_credentials(kwargs) - - def _apply_credentials(self, attr): - for key in attr.keys(): - if key in self.ATTRIBUTES: - setattr(self, key, attr[key]) - else: - raise exceptions.InvalidCredentials - - def __str__(self): - """ - Represent only attributes included in self.ATTRIBUTES - """ - _repr = dict((k, getattr(self, k)) for k in self.ATTRIBUTES) - return str(_repr) - - def __eq__(self, other): - """ - Credentials are equal if attributes in self.ATTRIBUTES are equal - """ - return str(self) == str(other) - - def __getattr__(self, key): - # If an attribute is set, __getattr__ is not invoked - # If an attribute is not set, and it is a known one, return None - if key in self.ATTRIBUTES: - return None - else: - raise AttributeError - - def __delitem__(self, key): - # For backwards compatibility, support dict behaviour - if key in self.ATTRIBUTES: - delattr(self, key) - else: - raise AttributeError - - def get(self, item, default): - # In this patch act as dict for backward compatibility - try: - return getattr(self, item) - except AttributeError: - return default - - def get_init_attributes(self): - return self._initial.keys() - - def is_valid(self): - raise NotImplementedError - - def reset(self): - # First delete all known attributes - for key in self.ATTRIBUTES: - if getattr(self, key) is not None: - delattr(self, key) - # Then re-apply initial setup - self._apply_credentials(self._initial) - - -class KeystoneV2Credentials(Credentials): - - ATTRIBUTES = ['username', 'password', 'tenant_name', 'user_id', - 'tenant_id'] - - def is_valid(self): - """ - Minimum set of valid credentials, are username and password. - Tenant is optional. - """ - return None not in (self.username, self.password) - - -class KeystoneV3Credentials(Credentials): - """ - Credentials suitable for the Keystone Identity V3 API - """ - - ATTRIBUTES = ['domain_id', 'domain_name', 'password', 'username', - 'project_domain_id', 'project_domain_name', 'project_id', - 'project_name', 'tenant_id', 'tenant_name', 'user_domain_id', - 'user_domain_name', 'user_id'] - - def __setattr__(self, key, value): - parent = super(KeystoneV3Credentials, self) - # for tenant_* set both project and tenant - if key == 'tenant_id': - parent.__setattr__('project_id', value) - elif key == 'tenant_name': - parent.__setattr__('project_name', value) - # for project_* set both project and tenant - if key == 'project_id': - parent.__setattr__('tenant_id', value) - elif key == 'project_name': - parent.__setattr__('tenant_name', value) - # for *_domain_* set both user and project if not set yet - if key == 'user_domain_id': - if self.project_domain_id is None: - parent.__setattr__('project_domain_id', value) - if key == 'project_domain_id': - if self.user_domain_id is None: - parent.__setattr__('user_domain_id', value) - if key == 'user_domain_name': - if self.project_domain_name is None: - parent.__setattr__('project_domain_name', value) - if key == 'project_domain_name': - if self.user_domain_name is None: - parent.__setattr__('user_domain_name', value) - # support domain_name coming from config - if key == 'domain_name': - parent.__setattr__('user_domain_name', value) - parent.__setattr__('project_domain_name', value) - # finally trigger default behaviour for all attributes - parent.__setattr__(key, value) - - def is_valid(self): - """ - Valid combinations of v3 credentials (excluding token, scope) - - User id, password (optional domain) - - User name, password and its domain id/name - For the scope, valid combinations are: - - None - - Project id (optional domain) - - Project name and its domain id/name - - Domain id - - Domain name - """ - valid_user_domain = any( - [self.user_domain_id is not None, - self.user_domain_name is not None]) - valid_project_domain = any( - [self.project_domain_id is not None, - self.project_domain_name is not None]) - valid_user = any( - [self.user_id is not None, - self.username is not None and valid_user_domain]) - valid_project_scope = any( - [self.project_name is None and self.project_id is None, - self.project_id is not None, - self.project_name is not None and valid_project_domain]) - valid_domain_scope = any( - [self.domain_id is None and self.domain_name is None, - self.domain_id or self.domain_name]) - return all([self.password is not None, - valid_user, - valid_project_scope and valid_domain_scope]) - - -IDENTITY_VERSION = {'v2': (KeystoneV2Credentials, KeystoneV2AuthProvider), - 'v3': (KeystoneV3Credentials, KeystoneV3AuthProvider)} diff --git a/tempest/clients.py b/tempest/clients.py index e1b6eab13b..a0c9471dba 100644 --- a/tempest/clients.py +++ b/tempest/clients.py @@ -16,6 +16,8 @@ import copy from oslo_log import log as logging +from tempest_lib.services.identity.v2.token_client import TokenClientJSON +from tempest_lib.services.identity.v3.token_client import V3TokenClientJSON from tempest.common import cred_provider from tempest.common import negative_rest_client @@ -77,7 +79,6 @@ from tempest.services.database.json.versions_client import \ DatabaseVersionsClientJSON from tempest.services.identity.v2.json.identity_client import \ IdentityClientJSON -from tempest.services.identity.v2.json.token_client import TokenClientJSON from tempest.services.identity.v3.json.credentials_client import \ CredentialsClientJSON from tempest.services.identity.v3.json.endpoints_client import \ @@ -88,7 +89,6 @@ from tempest.services.identity.v3.json.policy_client import PolicyClientJSON from tempest.services.identity.v3.json.region_client import RegionClientJSON from tempest.services.identity.v3.json.service_client import \ ServiceClientJSON -from tempest.services.identity.v3.json.token_client import V3TokenClientJSON from tempest.services.image.v1.json.image_client import ImageClientJSON from tempest.services.image.v2.json.image_client import ImageClientV2JSON from tempest.services.messaging.json.messaging_client import \ diff --git a/tempest/cmd/javelin.py b/tempest/cmd/javelin.py index f84771fd88..b2311b0472 100755 --- a/tempest/cmd/javelin.py +++ b/tempest/cmd/javelin.py @@ -114,10 +114,10 @@ import unittest import netaddr from oslo_log import log as logging from oslo_utils import timeutils +from tempest_lib import auth from tempest_lib import exceptions as lib_exc import yaml -import tempest.auth from tempest import config from tempest.services.compute.json import flavors_client from tempest.services.compute.json import floating_ips_client @@ -175,7 +175,7 @@ class OSClient(object): } object_storage_params.update(default_params) - _creds = tempest.auth.KeystoneV2Credentials( + _creds = auth.KeystoneV2Credentials( username=user, password=pw, tenant_name=tenant) @@ -185,7 +185,7 @@ class OSClient(object): 'ca_certs': CONF.identity.ca_certificates_file, 'trace_requests': CONF.debug.trace_requests } - _auth = tempest.auth.KeystoneV2AuthProvider( + _auth = auth.KeystoneV2AuthProvider( _creds, CONF.identity.uri, **auth_provider_params) self.identity = identity_client.IdentityClientJSON( _auth, diff --git a/tempest/common/cred_provider.py b/tempest/common/cred_provider.py index 32230276ee..461097f76b 100644 --- a/tempest/common/cred_provider.py +++ b/tempest/common/cred_provider.py @@ -16,8 +16,8 @@ import abc from oslo_log import log as logging import six +from tempest_lib import auth -from tempest import auth from tempest import config from tempest import exceptions diff --git a/tempest/manager.py b/tempest/manager.py index 025ce65d99..c39d3e5d1c 100644 --- a/tempest/manager.py +++ b/tempest/manager.py @@ -13,7 +13,8 @@ # License for the specific language governing permissions and limitations # under the License. -from tempest import auth +from tempest_lib import auth + from tempest.common import cred_provider from tempest import config from tempest import exceptions diff --git a/tempest/tests/common/test_accounts.py b/tempest/tests/common/test_accounts.py index b4048ba99c..f357bf457e 100644 --- a/tempest/tests/common/test_accounts.py +++ b/tempest/tests/common/test_accounts.py @@ -20,13 +20,13 @@ from oslo_concurrency.fixture import lockutils as lockutils_fixtures from oslo_concurrency import lockutils from oslo_config import cfg from oslotest import mockpatch +from tempest_lib import auth +from tempest_lib.services.identity.v2 import token_client -from tempest import auth from tempest.common import accounts from tempest.common import cred_provider from tempest import config from tempest import exceptions -from tempest.services.identity.v2.json import token_client from tempest.tests import base from tempest.tests import fake_config from tempest.tests import fake_http diff --git a/tempest/tests/common/test_cred_provider.py b/tempest/tests/common/test_cred_provider.py index 76430ac012..ed3f9318a1 100644 --- a/tempest/tests/common/test_cred_provider.py +++ b/tempest/tests/common/test_cred_provider.py @@ -13,21 +13,21 @@ # under the License. from oslo_config import cfg +from tempest_lib import auth +from tempest_lib import exceptions as lib_exc +from tempest_lib.services.identity.v2 import token_client as v2_client +from tempest_lib.services.identity.v3 import token_client as v3_client + -from tempest import auth from tempest.common import cred_provider from tempest.common import tempest_fixtures as fixtures from tempest import config -from tempest.services.identity.v2.json import token_client as v2_client -from tempest.services.identity.v3.json import token_client as v3_client +from tempest.tests import base from tempest.tests import fake_config from tempest.tests import fake_identity -# Note(andreaf): once credentials tests move to tempest-lib, I will copy the -# parts of them required by these here. -from tempest.tests import test_credentials as test_creds -class ConfiguredV2CredentialsTests(test_creds.CredentialsTests): +class ConfiguredV2CredentialsTests(base.TestCase): attributes = { 'username': 'fake_username', 'password': 'fake_password', @@ -46,6 +46,23 @@ class ConfiguredV2CredentialsTests(test_creds.CredentialsTests): self.stubs.Set(self.tokenclient_class, 'raw_request', self.identity_response) + def _get_credentials(self, attributes=None): + if attributes is None: + attributes = self.attributes + return self.credentials_class(**attributes) + + def _check(self, credentials, credentials_class, filled): + # Check the right version of credentials has been returned + self.assertIsInstance(credentials, credentials_class) + # Check the id attributes are filled in + attributes = [x for x in credentials.ATTRIBUTES if ( + '_id' in x and x != 'domain_id')] + for attr in attributes: + if filled: + self.assertIsNotNone(getattr(credentials, attr)) + else: + self.assertIsNone(getattr(credentials, attr)) + def _verify_credentials(self, credentials_class, filled=True, identity_version=None): for ctype in cred_provider.CREDENTIAL_TYPES: @@ -58,6 +75,15 @@ class ConfiguredV2CredentialsTests(test_creds.CredentialsTests): identity_version=identity_version) self._check(creds, credentials_class, filled) + def test_create(self): + creds = self._get_credentials() + self.assertEqual(self.attributes, creds._initial) + + def test_create_invalid_attr(self): + self.assertRaises(lib_exc.InvalidCredentials, + self._get_credentials, + attributes=dict(invalid='fake')) + def test_get_configured_credentials(self): self.useFixture(fixtures.LockFixture('auth_version')) self._verify_credentials(credentials_class=self.credentials_class) diff --git a/tempest/tests/fake_credentials.py b/tempest/tests/fake_credentials.py deleted file mode 100644 index 649d51df6c..0000000000 --- a/tempest/tests/fake_credentials.py +++ /dev/null @@ -1,63 +0,0 @@ -# Copyright 2014 Hewlett-Packard Development Company, L.P. -# All Rights Reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. - -from tempest import auth - - -class FakeCredentials(auth.Credentials): - - def is_valid(self): - return True - - -class FakeKeystoneV2Credentials(auth.KeystoneV2Credentials): - - def __init__(self): - creds = dict( - username='fake_username', - password='fake_password', - tenant_name='fake_tenant_name' - ) - super(FakeKeystoneV2Credentials, self).__init__(**creds) - - -class FakeKeystoneV3Credentials(auth.KeystoneV3Credentials): - """ - Fake credentials suitable for the Keystone Identity V3 API - """ - - def __init__(self): - creds = dict( - username='fake_username', - password='fake_password', - user_domain_name='fake_domain_name', - project_name='fake_tenant_name', - project_domain_name='fake_domain_name' - ) - super(FakeKeystoneV3Credentials, self).__init__(**creds) - - -class FakeKeystoneV3DomainCredentials(auth.KeystoneV3Credentials): - """ - Fake credentials suitable for the Keystone Identity V3 API, with no scope - """ - - def __init__(self): - creds = dict( - username='fake_username', - password='fake_password', - user_domain_name='fake_domain_name' - ) - super(FakeKeystoneV3DomainCredentials, self).__init__(**creds) diff --git a/tempest/tests/test_auth.py b/tempest/tests/test_auth.py deleted file mode 100644 index eb63b30004..0000000000 --- a/tempest/tests/test_auth.py +++ /dev/null @@ -1,411 +0,0 @@ -# Copyright 2014 IBM Corp. -# All Rights Reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. - -import copy -import datetime - -from oslotest import mockpatch - -from tempest import auth -from tempest import exceptions -from tempest.services.identity.v2.json import token_client as v2_client -from tempest.services.identity.v3.json import token_client as v3_client -from tempest.tests import base -from tempest.tests import fake_credentials -from tempest.tests import fake_http -from tempest.tests import fake_identity - - -def fake_get_credentials(fill_in=True, identity_version='v2', **kwargs): - return fake_credentials.FakeCredentials() - - -class BaseAuthTestsSetUp(base.TestCase): - _auth_provider_class = None - credentials = fake_credentials.FakeCredentials() - - def _auth(self, credentials, auth_url, **params): - """ - returns auth method according to keystone - """ - return self._auth_provider_class(credentials, auth_url, **params) - - def setUp(self): - super(BaseAuthTestsSetUp, self).setUp() - self.fake_http = fake_http.fake_httplib2(return_type=200) - self.stubs.Set(auth, 'get_credentials', fake_get_credentials) - self.auth_provider = self._auth(self.credentials, - fake_identity.FAKE_AUTH_URL) - - -class TestBaseAuthProvider(BaseAuthTestsSetUp): - """ - This tests auth.AuthProvider class which is base for the other so we - obviously don't test not implemented method or the ones which strongly - depends on them. - """ - - class FakeAuthProviderImpl(auth.AuthProvider): - def _decorate_request(): - pass - - def _fill_credentials(): - pass - - def _get_auth(): - pass - - def base_url(): - pass - - def is_expired(): - pass - - _auth_provider_class = FakeAuthProviderImpl - - def _auth(self, credentials, auth_url, **params): - """ - returns auth method according to keystone - """ - return self._auth_provider_class(credentials, **params) - - def test_check_credentials_bad_type(self): - self.assertFalse(self.auth_provider.check_credentials([])) - - def test_auth_data_property_when_cache_exists(self): - self.auth_provider.cache = 'foo' - self.useFixture(mockpatch.PatchObject(self.auth_provider, - 'is_expired', - return_value=False)) - self.assertEqual('foo', getattr(self.auth_provider, 'auth_data')) - - def test_delete_auth_data_property_through_deleter(self): - self.auth_provider.cache = 'foo' - del self.auth_provider.auth_data - self.assertIsNone(self.auth_provider.cache) - - def test_delete_auth_data_property_through_clear_auth(self): - self.auth_provider.cache = 'foo' - self.auth_provider.clear_auth() - self.assertIsNone(self.auth_provider.cache) - - def test_set_and_reset_alt_auth_data(self): - self.auth_provider.set_alt_auth_data('foo', 'bar') - self.assertEqual(self.auth_provider.alt_part, 'foo') - self.assertEqual(self.auth_provider.alt_auth_data, 'bar') - - self.auth_provider.reset_alt_auth_data() - self.assertIsNone(self.auth_provider.alt_part) - self.assertIsNone(self.auth_provider.alt_auth_data) - - def test_auth_class(self): - self.assertRaises(TypeError, - auth.AuthProvider, - fake_credentials.FakeCredentials) - - -class TestKeystoneV2AuthProvider(BaseAuthTestsSetUp): - _endpoints = fake_identity.IDENTITY_V2_RESPONSE['access']['serviceCatalog'] - _auth_provider_class = auth.KeystoneV2AuthProvider - credentials = fake_credentials.FakeKeystoneV2Credentials() - - def setUp(self): - super(TestKeystoneV2AuthProvider, self).setUp() - self.stubs.Set(v2_client.TokenClientJSON, 'raw_request', - fake_identity._fake_v2_response) - self.target_url = 'test_api' - - def _get_fake_alt_identity(self): - return fake_identity.ALT_IDENTITY_V2_RESPONSE['access'] - - def _get_result_url_from_endpoint(self, ep, endpoint_type='publicURL', - replacement=None): - if replacement: - return ep[endpoint_type].replace('v2', replacement) - return ep[endpoint_type] - - def _get_token_from_fake_identity(self): - return fake_identity.TOKEN - - def _get_from_fake_identity(self, attr): - access = fake_identity.IDENTITY_V2_RESPONSE['access'] - if attr == 'user_id': - return access['user']['id'] - elif attr == 'tenant_id': - return access['token']['tenant']['id'] - - def _test_request_helper(self, filters, expected): - url, headers, body = self.auth_provider.auth_request('GET', - self.target_url, - filters=filters) - - self.assertEqual(expected['url'], url) - self.assertEqual(expected['token'], headers['X-Auth-Token']) - self.assertEqual(expected['body'], body) - - def _auth_data_with_expiry(self, date_as_string): - token, access = self.auth_provider.auth_data - access['token']['expires'] = date_as_string - return token, access - - def test_request(self): - filters = { - 'service': 'compute', - 'endpoint_type': 'publicURL', - 'region': 'FakeRegion' - } - - url = self._get_result_url_from_endpoint( - self._endpoints[0]['endpoints'][1]) + '/' + self.target_url - - expected = { - 'body': None, - 'url': url, - 'token': self._get_token_from_fake_identity(), - } - self._test_request_helper(filters, expected) - - def test_request_with_alt_auth_cleans_alt(self): - self.auth_provider.set_alt_auth_data( - 'body', - (fake_identity.ALT_TOKEN, self._get_fake_alt_identity())) - self.test_request() - # Assert alt auth data is clear after it - self.assertIsNone(self.auth_provider.alt_part) - self.assertIsNone(self.auth_provider.alt_auth_data) - - def test_request_with_alt_part_without_alt_data(self): - """ - Assert that when alt_part is defined, the corresponding original - request element is kept the same. - """ - filters = { - 'service': 'compute', - 'endpoint_type': 'publicURL', - 'region': 'fakeRegion' - } - self.auth_provider.set_alt_auth_data('url', None) - - url, headers, body = self.auth_provider.auth_request('GET', - self.target_url, - filters=filters) - - self.assertEqual(url, self.target_url) - self.assertEqual(self._get_token_from_fake_identity(), - headers['X-Auth-Token']) - self.assertEqual(body, None) - - def test_request_with_bad_service(self): - filters = { - 'service': 'BAD_SERVICE', - 'endpoint_type': 'publicURL', - 'region': 'fakeRegion' - } - self.assertRaises(exceptions.EndpointNotFound, - self.auth_provider.auth_request, 'GET', - self.target_url, filters=filters) - - def test_request_without_service(self): - filters = { - 'service': None, - 'endpoint_type': 'publicURL', - 'region': 'fakeRegion' - } - self.assertRaises(exceptions.EndpointNotFound, - self.auth_provider.auth_request, 'GET', - self.target_url, filters=filters) - - def test_check_credentials_missing_attribute(self): - for attr in ['username', 'password']: - cred = copy.copy(self.credentials) - del cred[attr] - self.assertFalse(self.auth_provider.check_credentials(cred)) - - def test_fill_credentials(self): - self.auth_provider.fill_credentials() - creds = self.auth_provider.credentials - for attr in ['user_id', 'tenant_id']: - self.assertEqual(self._get_from_fake_identity(attr), - getattr(creds, attr)) - - def _test_base_url_helper(self, expected_url, filters, - auth_data=None): - - url = self.auth_provider.base_url(filters, auth_data) - self.assertEqual(url, expected_url) - - def test_base_url(self): - self.filters = { - 'service': 'compute', - 'endpoint_type': 'publicURL', - 'region': 'FakeRegion' - } - expected = self._get_result_url_from_endpoint( - self._endpoints[0]['endpoints'][1]) - self._test_base_url_helper(expected, self.filters) - - def test_base_url_to_get_admin_endpoint(self): - self.filters = { - 'service': 'compute', - 'endpoint_type': 'adminURL', - 'region': 'FakeRegion' - } - expected = self._get_result_url_from_endpoint( - self._endpoints[0]['endpoints'][1], endpoint_type='adminURL') - self._test_base_url_helper(expected, self.filters) - - def test_base_url_unknown_region(self): - """ - Assure that if the region is unknow the first endpoint is returned. - """ - self.filters = { - 'service': 'compute', - 'endpoint_type': 'publicURL', - 'region': 'AintNoBodyKnowThisRegion' - } - expected = self._get_result_url_from_endpoint( - self._endpoints[0]['endpoints'][0]) - self._test_base_url_helper(expected, self.filters) - - def test_base_url_with_non_existent_service(self): - self.filters = { - 'service': 'BAD_SERVICE', - 'endpoint_type': 'publicURL', - 'region': 'FakeRegion' - } - self.assertRaises(exceptions.EndpointNotFound, - self._test_base_url_helper, None, self.filters) - - def test_base_url_without_service(self): - self.filters = { - 'endpoint_type': 'publicURL', - 'region': 'FakeRegion' - } - self.assertRaises(exceptions.EndpointNotFound, - self._test_base_url_helper, None, self.filters) - - def test_base_url_with_api_version_filter(self): - self.filters = { - 'service': 'compute', - 'endpoint_type': 'publicURL', - 'region': 'FakeRegion', - 'api_version': 'v12' - } - expected = self._get_result_url_from_endpoint( - self._endpoints[0]['endpoints'][1], replacement='v12') - self._test_base_url_helper(expected, self.filters) - - def test_base_url_with_skip_path_filter(self): - self.filters = { - 'service': 'compute', - 'endpoint_type': 'publicURL', - 'region': 'FakeRegion', - 'skip_path': True - } - expected = 'http://fake_url/' - self._test_base_url_helper(expected, self.filters) - - def test_token_not_expired(self): - expiry_data = datetime.datetime.utcnow() + datetime.timedelta(days=1) - auth_data = self._auth_data_with_expiry( - expiry_data.strftime(self.auth_provider.EXPIRY_DATE_FORMAT)) - self.assertFalse(self.auth_provider.is_expired(auth_data)) - - def test_token_expired(self): - expiry_data = datetime.datetime.utcnow() - datetime.timedelta(hours=1) - auth_data = self._auth_data_with_expiry( - expiry_data.strftime(self.auth_provider.EXPIRY_DATE_FORMAT)) - self.assertTrue(self.auth_provider.is_expired(auth_data)) - - def test_token_not_expired_to_be_renewed(self): - expiry_data = datetime.datetime.utcnow() + \ - self.auth_provider.token_expiry_threshold / 2 - auth_data = self._auth_data_with_expiry( - expiry_data.strftime(self.auth_provider.EXPIRY_DATE_FORMAT)) - self.assertTrue(self.auth_provider.is_expired(auth_data)) - - -class TestKeystoneV3AuthProvider(TestKeystoneV2AuthProvider): - _endpoints = fake_identity.IDENTITY_V3_RESPONSE['token']['catalog'] - _auth_provider_class = auth.KeystoneV3AuthProvider - credentials = fake_credentials.FakeKeystoneV3Credentials() - - def setUp(self): - super(TestKeystoneV3AuthProvider, self).setUp() - self.stubs.Set(v3_client.V3TokenClientJSON, 'raw_request', - fake_identity._fake_v3_response) - - def _get_fake_alt_identity(self): - return fake_identity.ALT_IDENTITY_V3['token'] - - def _get_result_url_from_endpoint(self, ep, replacement=None): - if replacement: - return ep['url'].replace('v3', replacement) - return ep['url'] - - def _auth_data_with_expiry(self, date_as_string): - token, access = self.auth_provider.auth_data - access['expires_at'] = date_as_string - return token, access - - def _get_from_fake_identity(self, attr): - token = fake_identity.IDENTITY_V3_RESPONSE['token'] - if attr == 'user_id': - return token['user']['id'] - elif attr == 'project_id': - return token['project']['id'] - elif attr == 'user_domain_id': - return token['user']['domain']['id'] - elif attr == 'project_domain_id': - return token['project']['domain']['id'] - - def test_check_credentials_missing_attribute(self): - # reset credentials to fresh ones - self.credentials.reset() - for attr in ['username', 'password', 'user_domain_name', - 'project_domain_name']: - cred = copy.copy(self.credentials) - del cred[attr] - self.assertFalse(self.auth_provider.check_credentials(cred), - "Credentials should be invalid without %s" % attr) - - def test_check_domain_credentials_missing_attribute(self): - # reset credentials to fresh ones - self.credentials.reset() - domain_creds = fake_credentials.FakeKeystoneV3DomainCredentials() - for attr in ['username', 'password', 'user_domain_name']: - cred = copy.copy(domain_creds) - del cred[attr] - self.assertFalse(self.auth_provider.check_credentials(cred), - "Credentials should be invalid without %s" % attr) - - def test_fill_credentials(self): - self.auth_provider.fill_credentials() - creds = self.auth_provider.credentials - for attr in ['user_id', 'project_id', 'user_domain_id', - 'project_domain_id']: - self.assertEqual(self._get_from_fake_identity(attr), - getattr(creds, attr)) - - # Overwrites v2 test - def test_base_url_to_get_admin_endpoint(self): - self.filters = { - 'service': 'compute', - 'endpoint_type': 'admin', - 'region': 'MiddleEarthRegion' - } - expected = self._get_result_url_from_endpoint( - self._endpoints[0]['endpoints'][2]) - self._test_base_url_helper(expected, self.filters) diff --git a/tempest/tests/test_credentials.py b/tempest/tests/test_credentials.py deleted file mode 100644 index bf44d11e34..0000000000 --- a/tempest/tests/test_credentials.py +++ /dev/null @@ -1,180 +0,0 @@ -# Copyright 2014 Hewlett-Packard Development Company, L.P. -# All Rights Reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. - -import copy - -from tempest import auth -from tempest import exceptions -from tempest.services.identity.v2.json import token_client as v2_client -from tempest.services.identity.v3.json import token_client as v3_client -from tempest.tests import base -from tempest.tests import fake_identity - - -class CredentialsTests(base.TestCase): - attributes = {} - credentials_class = auth.Credentials - - def _get_credentials(self, attributes=None): - if attributes is None: - attributes = self.attributes - return self.credentials_class(**attributes) - - def _check(self, credentials, credentials_class, filled): - # Check the right version of credentials has been returned - self.assertIsInstance(credentials, credentials_class) - # Check the id attributes are filled in - attributes = [x for x in credentials.ATTRIBUTES if ( - '_id' in x and x != 'domain_id')] - for attr in attributes: - if filled: - self.assertIsNotNone(getattr(credentials, attr)) - else: - self.assertIsNone(getattr(credentials, attr)) - - def test_create(self): - creds = self._get_credentials() - self.assertEqual(self.attributes, creds._initial) - - def test_create_invalid_attr(self): - self.assertRaises(exceptions.InvalidCredentials, - self._get_credentials, - attributes=dict(invalid='fake')) - - def test_is_valid(self): - creds = self._get_credentials() - self.assertRaises(NotImplementedError, creds.is_valid) - - -class KeystoneV2CredentialsTests(CredentialsTests): - attributes = { - 'username': 'fake_username', - 'password': 'fake_password', - 'tenant_name': 'fake_tenant_name' - } - - identity_response = fake_identity._fake_v2_response - credentials_class = auth.KeystoneV2Credentials - tokenclient_class = v2_client.TokenClientJSON - identity_version = 'v2' - - def setUp(self): - super(KeystoneV2CredentialsTests, self).setUp() - self.stubs.Set(self.tokenclient_class, 'raw_request', - self.identity_response) - - def _verify_credentials(self, credentials_class, creds_dict, filled=True): - creds = auth.get_credentials(fake_identity.FAKE_AUTH_URL, - fill_in=filled, - identity_version=self.identity_version, - **creds_dict) - self._check(creds, credentials_class, filled) - - def test_get_credentials(self): - self._verify_credentials(credentials_class=self.credentials_class, - creds_dict=self.attributes) - - def test_get_credentials_not_filled(self): - self._verify_credentials(credentials_class=self.credentials_class, - creds_dict=self.attributes, - filled=False) - - def test_is_valid(self): - creds = self._get_credentials() - self.assertTrue(creds.is_valid()) - - def _test_is_not_valid(self, ignore_key): - creds = self._get_credentials() - for attr in self.attributes.keys(): - if attr == ignore_key: - continue - temp_attr = getattr(creds, attr) - delattr(creds, attr) - self.assertFalse(creds.is_valid(), - "Credentials should be invalid without %s" % attr) - setattr(creds, attr, temp_attr) - - def test_is_not_valid(self): - # NOTE(mtreinish): A KeystoneV2 credential object is valid without - # a tenant_name. So skip that check. See tempest.auth for the valid - # credential requirements - self._test_is_not_valid('tenant_name') - - def test_reset_all_attributes(self): - creds = self._get_credentials() - initial_creds = copy.deepcopy(creds) - set_attr = creds.__dict__.keys() - missing_attr = set(creds.ATTRIBUTES).difference(set_attr) - # Set all unset attributes, then reset - for attr in missing_attr: - setattr(creds, attr, 'fake' + attr) - creds.reset() - # Check reset credentials are same as initial ones - self.assertEqual(creds, initial_creds) - - def test_reset_single_attribute(self): - creds = self._get_credentials() - initial_creds = copy.deepcopy(creds) - set_attr = creds.__dict__.keys() - missing_attr = set(creds.ATTRIBUTES).difference(set_attr) - # Set one unset attributes, then reset - for attr in missing_attr: - setattr(creds, attr, 'fake' + attr) - creds.reset() - # Check reset credentials are same as initial ones - self.assertEqual(creds, initial_creds) - - -class KeystoneV3CredentialsTests(KeystoneV2CredentialsTests): - attributes = { - 'username': 'fake_username', - 'password': 'fake_password', - 'project_name': 'fake_project_name', - 'user_domain_name': 'fake_domain_name' - } - - credentials_class = auth.KeystoneV3Credentials - identity_response = fake_identity._fake_v3_response - tokenclient_class = v3_client.V3TokenClientJSON - identity_version = 'v3' - - def test_is_not_valid(self): - # NOTE(mtreinish) For a Keystone V3 credential object a project name - # is not required to be valid, so we skip that check. See tempest.auth - # for the valid credential requirements - self._test_is_not_valid('project_name') - - def test_synced_attributes(self): - attributes = self.attributes - # Create V3 credentials with tenant instead of project, and user_domain - for attr in ['project_id', 'user_domain_id']: - attributes[attr] = 'fake_' + attr - creds = self._get_credentials(attributes) - self.assertEqual(creds.project_name, creds.tenant_name) - self.assertEqual(creds.project_id, creds.tenant_id) - self.assertEqual(creds.user_domain_name, creds.project_domain_name) - self.assertEqual(creds.user_domain_id, creds.project_domain_id) - # Replace user_domain with project_domain - del attributes['user_domain_name'] - del attributes['user_domain_id'] - del attributes['project_name'] - del attributes['project_id'] - for attr in ['project_domain_name', 'project_domain_id', - 'tenant_name', 'tenant_id']: - attributes[attr] = 'fake_' + attr - self.assertEqual(creds.tenant_name, creds.project_name) - self.assertEqual(creds.tenant_id, creds.project_id) - self.assertEqual(creds.project_domain_name, creds.user_domain_name) - self.assertEqual(creds.project_domain_id, creds.user_domain_id) diff --git a/tempest/tests/test_tenant_isolation.py b/tempest/tests/test_tenant_isolation.py index fd8718f64f..1eb33a4731 100644 --- a/tempest/tests/test_tenant_isolation.py +++ b/tempest/tests/test_tenant_isolation.py @@ -15,6 +15,7 @@ import mock from oslo_config import cfg from oslotest import mockpatch +from tempest_lib.services.identity.v2 import token_client as json_token_client from tempest.common import isolated_creds from tempest.common import service_client @@ -22,7 +23,6 @@ from tempest import config from tempest import exceptions from tempest.services.identity.v2.json import identity_client as \ json_iden_client -from tempest.services.identity.v2.json import token_client as json_token_client from tempest.services.network.json import network_client as json_network_client from tempest.tests import base from tempest.tests import fake_config