Move default creds to cred_provider

Credential types available in configuration, and how to parse
them and validate them is currently embedded in the Credentials
class hierarchy.

Move the entire logic to cred_provider, in preparation for the
migration of the auth module to tempest-lib.

The pre-provisioned account credential provider relied on
the list of configured field to find the hash for a set of
credentials. Changed that to use the list of attributes with
which the credentials were initially setup.

Change-Id: Ic3ff15799f016277e6eb59c53cf8f24f6a7be9c9
This commit is contained in:
Andrea Frittoli 2015-01-29 12:43:09 +00:00 committed by Matthew Treinish
parent 9dd5848b70
commit 9efbe95571
10 changed files with 193 additions and 146 deletions

View File

@ -440,31 +440,23 @@ class KeystoneV3AuthProvider(KeystoneAuthProvider):
datetime.datetime.utcnow()
def get_default_credentials(credential_type, fill_in=True):
"""
Returns configured credentials of the specified type
based on the configured auth_version
"""
return get_credentials(fill_in=fill_in, credential_type=credential_type)
def get_credentials(credential_type=None, fill_in=True, **kwargs):
def get_credentials(fill_in=True, **kwargs):
"""
Builds a credentials object based on the configured auth_version
:param credential_type (string): requests credentials from tempest
configuration file. Valid values are defined in
Credentials.TYPE.
:param kwargs (dict): take into account only if credential_type is
not specified or None. Dict of credential key/value pairs
:param fill_in (boolean): obtain a token and fill in all credential
details provided by the identity service. When fill_in is not
specified, credentials are not validated. Validation can be invoked
by invoking ``is_valid()``
:param kwargs (dict): Dict of credential key/value pairs
Examples:
Returns credentials from the provided parameters:
>>> get_credentials(username='foo', password='bar')
Returns credentials from tempest configuration:
>>> get_credentials(credential_type='user')
Returns credentials including IDs:
>>> get_credentials(username='foo', password='bar', fill_in=True)
"""
if CONF.identity.auth_version == 'v2':
credential_class = KeystoneV2Credentials
@ -474,10 +466,7 @@ def get_credentials(credential_type=None, fill_in=True, **kwargs):
auth_provider_class = KeystoneV3AuthProvider
else:
raise exceptions.InvalidConfiguration('Unsupported auth version')
if credential_type is not None:
creds = credential_class.get_default(credential_type)
else:
creds = credential_class(**kwargs)
creds = credential_class(**kwargs)
# Fill in the credentials fields that were not specified
if fill_in:
auth_provider = auth_provider_class(creds)
@ -490,18 +479,9 @@ class Credentials(object):
Set of credentials for accessing OpenStack services
ATTRIBUTES: list of valid class attributes representing credentials.
TYPES: types of credentials available in the configuration file.
For each key there's a tuple (section, prefix) to match the
configuration options.
"""
ATTRIBUTES = []
TYPES = {
'identity_admin': ('identity', 'admin'),
'user': ('identity', None),
'alt_user': ('identity', 'alt')
}
def __init__(self, **kwargs):
"""
@ -554,21 +534,8 @@ class Credentials(object):
except AttributeError:
return default
@classmethod
def get_default(cls, credentials_type):
if credentials_type not in cls.TYPES:
raise exceptions.InvalidCredentials()
creds = cls._get_default(credentials_type)
if not creds.is_valid():
msg = ("The %s credentials are incorrectly set in the config file."
" Double check that all required values are assigned" %
credentials_type)
raise exceptions.InvalidConfiguration(msg)
return creds
@classmethod
def _get_default(cls, credentials_type):
raise NotImplementedError
def get_init_attributes(self):
return self._initial.keys()
def is_valid(self):
raise NotImplementedError
@ -584,21 +551,8 @@ class Credentials(object):
class KeystoneV2Credentials(Credentials):
CONF_ATTRIBUTES = ['username', 'password', 'tenant_name']
ATTRIBUTES = ['user_id', 'tenant_id']
ATTRIBUTES.extend(CONF_ATTRIBUTES)
@classmethod
def _get_default(cls, credentials_type='user'):
params = {}
section, prefix = cls.TYPES[credentials_type]
for attr in cls.CONF_ATTRIBUTES:
_section = getattr(CONF, section)
if prefix is None:
params[attr] = getattr(_section, attr)
else:
params[attr] = getattr(_section, prefix + "_" + attr)
return cls(**params)
ATTRIBUTES = ['username', 'password', 'tenant_name', 'user_id',
'tenant_id']
def is_valid(self):
"""
@ -608,16 +562,15 @@ class KeystoneV2Credentials(Credentials):
return None not in (self.username, self.password)
class KeystoneV3Credentials(KeystoneV2Credentials):
class KeystoneV3Credentials(Credentials):
"""
Credentials suitable for the Keystone Identity V3 API
"""
CONF_ATTRIBUTES = ['domain_name', 'password', 'tenant_name', 'username']
ATTRIBUTES = ['project_domain_id', 'project_domain_name', 'project_id',
ATTRIBUTES = ['domain_name', 'password', 'tenant_name', 'username',
'project_domain_id', 'project_domain_name', 'project_id',
'project_name', 'tenant_id', 'tenant_name', 'user_domain_id',
'user_domain_name', 'user_id']
ATTRIBUTES.extend(CONF_ATTRIBUTES)
def __init__(self, **kwargs):
"""

View File

@ -15,7 +15,7 @@
import copy
from tempest import auth
from tempest.common import cred_provider
from tempest.common import negative_rest_client
from tempest import config
from tempest import manager
@ -378,6 +378,7 @@ class AdminManager(Manager):
def __init__(self, interface='json', service=None):
super(AdminManager, self).__init__(
credentials=auth.get_default_credentials('identity_admin'),
credentials=cred_provider.get_configured_credentials(
'identity_admin'),
interface=interface,
service=service)

View File

@ -109,9 +109,9 @@ class Accounts(cred_provider.CredentialProvider):
def get_hash(self, creds):
for _hash in self.hash_dict:
# Comparing on the attributes that are expected in the YAML
# Comparing on the attributes that were read from the YAML
if all([getattr(creds, k) == self.hash_dict[_hash][k] for k in
creds.CONF_ATTRIBUTES]):
creds.get_init_attributes()]):
return _hash
raise AttributeError('Invalid credentials %s' % creds)
@ -191,7 +191,8 @@ class NotLockingAccounts(Accounts):
creds = self.get_creds(0)
primary_credential = auth.get_credentials(**creds)
else:
primary_credential = auth.get_default_credentials('user')
primary_credential = cred_provider.get_configured_credentials(
'user')
self.isolated_creds['primary'] = primary_credential
return primary_credential
@ -202,7 +203,8 @@ class NotLockingAccounts(Accounts):
creds = self.get_creds(1)
alt_credential = auth.get_credentials(**creds)
else:
alt_credential = auth.get_default_credentials('alt_user')
alt_credential = cred_provider.get_configured_credentials(
'alt_user')
self.isolated_creds['alt'] = alt_credential
return alt_credential
@ -210,4 +212,5 @@ class NotLockingAccounts(Accounts):
self.isolated_creds = {}
def get_admin_creds(self):
return auth.get_default_credentials("identity_admin", fill_in=False)
return cred_provider.get_configured_credentials(
"identity_admin", fill_in=False)

View File

@ -16,12 +16,55 @@ import abc
import six
from tempest import auth
from tempest import config
from tempest import exceptions
from tempest.openstack.common import log as logging
CONF = config.CONF
LOG = logging.getLogger(__name__)
# Type of credentials available from configuration
CREDENTIAL_TYPES = {
'identity_admin': ('identity', 'admin'),
'user': ('identity', None),
'alt_user': ('identity', 'alt')
}
# Read credentials from configuration, builds a Credentials object
# based on the specified or configured version
def get_configured_credentials(credential_type, fill_in=True,
identity_version=None):
identity_version = identity_version or CONF.identity.auth_version
if identity_version not in ('v2', 'v3'):
raise exceptions.InvalidConfiguration(
'Unsupported auth version: %s' % identity_version)
if credential_type not in CREDENTIAL_TYPES:
raise exceptions.InvalidCredentials()
conf_attributes = ['username', 'password', 'tenant_name']
if identity_version == 'v3':
conf_attributes.append('domain_name')
# Read the parts of credentials from config
params = {}
section, prefix = CREDENTIAL_TYPES[credential_type]
for attr in conf_attributes:
_section = getattr(CONF, section)
if prefix is None:
params[attr] = getattr(_section, attr)
else:
params[attr] = getattr(_section, prefix + "_" + attr)
# Build and validate credentials. We are reading configured credentials,
# so validate them even if fill_in is False
credentials = auth.get_credentials(fill_in=fill_in, **params)
if not fill_in:
if not credentials.is_valid():
msg = ("The %s credentials are incorrectly set in the config file."
" Double check that all required values are assigned" %
credential_type)
raise exceptions.InvalidConfiguration(msg)
return credentials
@six.add_metaclass(abc.ABCMeta)
class CredentialProvider(object):

View File

@ -14,6 +14,7 @@
# under the License.
from tempest import auth
from tempest.common import cred_provider
from tempest import config
from tempest import exceptions
@ -39,7 +40,7 @@ class Manager(object):
"""
self.auth_version = CONF.identity.auth_version
if credentials is None:
self.credentials = auth.get_default_credentials('user')
self.credentials = cred_provider.get_configured_credentials('user')
else:
self.credentials = credentials
# Check if passed or default credentials are valid

View File

@ -20,8 +20,8 @@ import subprocess
import netaddr
import six
from tempest import auth
from tempest import clients
from tempest.common import cred_provider
from tempest.common import credentials
from tempest.common.utils import data_utils
from tempest.common.utils.linux import remote_client
@ -1259,8 +1259,9 @@ class OrchestrationScenarioTest(ScenarioTest):
@classmethod
def credentials(cls):
admin_creds = auth.get_default_credentials('identity_admin')
creds = auth.get_default_credentials('user')
admin_creds = cred_provider.get_configured_credentials(
'identity_admin')
creds = cred_provider.get_configured_credentials('user')
admin_creds.tenant_name = creds.tenant_name
return admin_creds

View File

@ -21,8 +21,8 @@ import unicodedata
import testscenarios
import testtools
from tempest import auth
from tempest import clients
from tempest.common import cred_provider
from tempest.common.utils import misc
from tempest import config
from tempest import exceptions
@ -101,7 +101,7 @@ class InputScenarioUtils(object):
def __init__(self):
os = clients.Manager(
auth.get_default_credentials('user', fill_in=False))
cred_provider.get_configured_credentials('user', fill_in=False))
self.images_client = os.images_client
self.flavors_client = os.flavors_client
self.image_pattern = CONF.input_scenario.image_regex

View File

@ -0,0 +1,97 @@
# Copyright 2015 Hewlett-Packard Development Company, L.P.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from oslo.config import cfg
from tempest import auth
from tempest.common import cred_provider
from tempest.common import tempest_fixtures as fixtures
from tempest.services.identity.json import token_client as v2_client
from tempest.services.identity.v3.json import token_client as v3_client
from tempest.tests import fake_identity
# Note: eventually the auth module will move to tempest-lib, and so wil its
# unit tests. *CredentialsTests will be imported from tempest-lib then.
from tempest.tests import test_credentials as test_creds
class ConfiguredV2CredentialsTests(test_creds.CredentialsTests):
attributes = {
'username': 'fake_username',
'password': 'fake_password',
'tenant_name': 'fake_tenant_name'
}
identity_response = fake_identity._fake_v2_response
credentials_class = auth.KeystoneV2Credentials
tokenclient_class = v2_client.TokenClientJSON
identity_version = 'v2'
def setUp(self):
super(ConfiguredV2CredentialsTests, self).setUp()
self.stubs.Set(self.tokenclient_class, 'raw_request',
self.identity_response)
def _verify_credentials(self, credentials_class, filled=True,
identity_version=None):
for ctype in cred_provider.CREDENTIAL_TYPES:
if identity_version is None:
creds = cred_provider.get_configured_credentials(
credential_type=ctype, fill_in=filled)
else:
creds = cred_provider.get_configured_credentials(
credential_type=ctype, fill_in=filled,
identity_version=identity_version)
self._check(creds, credentials_class, filled)
def test_get_configured_credentials(self):
self.useFixture(fixtures.LockFixture('auth_version'))
self._verify_credentials(credentials_class=self.credentials_class)
def test_get_configured_credentials_unfilled(self):
self.useFixture(fixtures.LockFixture('auth_version'))
self._verify_credentials(credentials_class=self.credentials_class,
filled=False)
def test_get_configured_credentials_version(self):
# version specified and not loaded from config
self.useFixture(fixtures.LockFixture('auth_version'))
self._verify_credentials(credentials_class=self.credentials_class,
identity_version=self.identity_version)
def test_is_valid(self):
creds = self._get_credentials()
self.assertTrue(creds.is_valid())
class ConfiguredV3CredentialsTests(ConfiguredV2CredentialsTests):
attributes = {
'username': 'fake_username',
'password': 'fake_password',
'project_name': 'fake_project_name',
'user_domain_name': 'fake_domain_name'
}
credentials_class = auth.KeystoneV3Credentials
identity_response = fake_identity._fake_v3_response
tokenclient_class = v3_client.V3TokenClientJSON
identity_version = 'v3'
def setUp(self):
super(ConfiguredV3CredentialsTests, self).setUp()
# Additional config items reset by cfg fixture after each test
cfg.CONF.set_default('auth_version', 'v3', group='identity')
# Identity group items
for prefix in ['', 'alt_', 'admin_']:
cfg.CONF.set_default(prefix + 'domain_name', 'fake_domain_name',
group='identity')

View File

@ -30,11 +30,7 @@ from tempest.tests import fake_http
from tempest.tests import fake_identity
def fake_get_default_credentials(credential_type, fill_in=True):
return fake_credentials.FakeCredentials()
def fake_get_credentials(credential_type=None, fill_in=True, **kwargs):
def fake_get_credentials(fill_in=True, **kwargs):
return fake_credentials.FakeCredentials()
@ -54,8 +50,6 @@ class BaseAuthTestsSetUp(base.TestCase):
self.stubs.Set(config, 'TempestConfigPrivate', fake_config.FakePrivate)
self.fake_http = fake_http.fake_httplib2(return_type=200)
self.stubs.Set(auth, 'get_credentials', fake_get_credentials)
self.stubs.Set(auth, 'get_default_credentials',
fake_get_default_credentials)
self.auth_provider = self._auth(self.credentials)

View File

@ -37,6 +37,18 @@ class CredentialsTests(base.TestCase):
attributes = self.attributes
return self.credentials_class(**attributes)
def _check(self, credentials, credentials_class, filled):
# Check the right version of credentials has been returned
self.assertIsInstance(credentials, credentials_class)
# Check the id attributes are filled in
attributes = [x for x in credentials.ATTRIBUTES if (
'_id' in x and x != 'domain_id')]
for attr in attributes:
if filled:
self.assertIsNotNone(getattr(credentials, attr))
else:
self.assertIsNone(getattr(credentials, attr))
def setUp(self):
super(CredentialsTests, self).setUp()
self.useFixture(fake_config.ConfigFixture())
@ -51,18 +63,6 @@ class CredentialsTests(base.TestCase):
self._get_credentials,
attributes=dict(invalid='fake'))
def test_default(self):
self.useFixture(fixtures.LockFixture('auth_version'))
for ctype in self.credentials_class.TYPES:
self.assertRaises(NotImplementedError,
self.credentials_class.get_default,
credentials_type=ctype)
def test_invalid_default(self):
self.assertRaises(exceptions.InvalidCredentials,
auth.Credentials.get_default,
credentials_type='invalid_type')
def test_is_valid(self):
creds = self._get_credentials()
self.assertRaises(NotImplementedError, creds.is_valid)
@ -84,33 +84,9 @@ class KeystoneV2CredentialsTests(CredentialsTests):
self.stubs.Set(self.tokenclient_class, 'raw_request',
self.identity_response)
def _verify_credentials(self, credentials_class, filled=True,
creds_dict=None):
def _check(credentials):
# Check the right version of credentials has been returned
self.assertIsInstance(credentials, credentials_class)
# Check the id attributes are filled in
attributes = [x for x in credentials.ATTRIBUTES if (
'_id' in x and x != 'domain_id')]
for attr in attributes:
if filled:
self.assertIsNotNone(getattr(credentials, attr))
else:
self.assertIsNone(getattr(credentials, attr))
if creds_dict is None:
for ctype in auth.Credentials.TYPES:
creds = auth.get_default_credentials(credential_type=ctype,
fill_in=filled)
_check(creds)
else:
creds = auth.get_credentials(fill_in=filled, **creds_dict)
_check(creds)
def test_get_default_credentials(self):
self.useFixture(fixtures.LockFixture('auth_version'))
self._verify_credentials(credentials_class=self.credentials_class)
def _verify_credentials(self, credentials_class, creds_dict, filled=True):
creds = auth.get_credentials(fill_in=filled, **creds_dict)
self._check(creds, credentials_class, filled)
def test_get_credentials(self):
self.useFixture(fixtures.LockFixture('auth_version'))
@ -120,8 +96,8 @@ class KeystoneV2CredentialsTests(CredentialsTests):
def test_get_credentials_not_filled(self):
self.useFixture(fixtures.LockFixture('auth_version'))
self._verify_credentials(credentials_class=self.credentials_class,
filled=False,
creds_dict=self.attributes)
creds_dict=self.attributes,
filled=False)
def test_is_valid(self):
creds = self._get_credentials()
@ -144,15 +120,6 @@ class KeystoneV2CredentialsTests(CredentialsTests):
# credential requirements
self._test_is_not_valid('tenant_name')
def test_default(self):
self.useFixture(fixtures.LockFixture('auth_version'))
for ctype in self.credentials_class.TYPES:
creds = self.credentials_class.get_default(credentials_type=ctype)
for attr in self.attributes.keys():
# Default configuration values related to credentials
# are defined as fake_* in fake_config.py
self.assertEqual(getattr(creds, attr), 'fake_' + attr)
def test_reset_all_attributes(self):
creds = self._get_credentials()
initial_creds = copy.deepcopy(creds)
@ -199,19 +166,6 @@ class KeystoneV3CredentialsTests(KeystoneV2CredentialsTests):
cfg.CONF.set_default(prefix + 'domain_name', 'fake_domain_name',
group='identity')
def test_default(self):
self.useFixture(fixtures.LockFixture('auth_version'))
for ctype in self.credentials_class.TYPES:
creds = self.credentials_class.get_default(credentials_type=ctype)
for attr in self.attributes.keys():
if attr == 'project_name':
config_value = 'fake_tenant_name'
elif attr == 'user_domain_name':
config_value = 'fake_domain_name'
else:
config_value = 'fake_' + attr
self.assertEqual(getattr(creds, attr), config_value)
def test_is_not_valid(self):
# NOTE(mtreinish) For a Keystone V3 credential object a project name
# is not required to be valid, so we skip that check. See tempest.auth