Defines a Credentials class

Multiversion auth part4

Introduces a Credential class.
Base class and V2 credentials are defined. V3 in the next patch.
Using Credentials, it becomes transparent for client managers and
tests which auth version is being used.
Defines a get_default_credentials utils, which is solely responsible
for reading credentials from CONF.
Defines a get_credentials, which builds V2 credentials
based on configuration.

Credentials are not used in tests yet, only by AuthProvider.
AuthProvider supports dict credentials in this patch for
backwards compatibility. Credentials are verified via unit test.

Partially implements: bp multi-keystone-api-version-tests

Change-Id: I5f8ab7ef37c99d2c2425b8487490e3517cfce7ed
This commit is contained in:
Andrea Frittoli 2014-04-06 11:46:32 +01:00
parent a130c4b3ec
commit 7d707a5633
6 changed files with 337 additions and 38 deletions

View File

@ -43,11 +43,11 @@ class AuthProvider(object):
:param client_type: 'tempest' or 'official'
:param interface: 'json' or 'xml'. Applicable for tempest client only
"""
credentials = self._convert_credentials(credentials)
if self.check_credentials(credentials):
self.credentials = credentials
else:
raise TypeError("Invalid credentials")
self.credentials = credentials
self.client_type = client_type
self.interface = interface
if self.client_type == 'tempest' and self.interface is None:
@ -56,6 +56,13 @@ class AuthProvider(object):
self.alt_auth_data = None
self.alt_part = None
def _convert_credentials(self, credentials):
# Support dict credentials for backwards compatibility
if isinstance(credentials, dict):
return get_credentials(**credentials)
else:
return credentials
def __str__(self):
return "Creds :{creds}, client type: {client_type}, interface: " \
"{interface}, cached auth data: {cache}".format(
@ -76,9 +83,9 @@ class AuthProvider(object):
@classmethod
def check_credentials(cls, credentials):
"""
Verify credentials are valid. Subclasses can do a better check.
Verify credentials are valid.
"""
return isinstance(credentials, dict)
return isinstance(credentials, Credentials) and credentials.is_valid()
@property
def auth_data(self):
@ -218,16 +225,6 @@ class KeystoneV2AuthProvider(KeystoneAuthProvider):
EXPIRY_DATE_FORMAT = '%Y-%m-%dT%H:%M:%SZ'
@classmethod
def check_credentials(cls, credentials, scoped=True):
# tenant_name is optional if not scoped
valid = super(KeystoneV2AuthProvider, cls).check_credentials(
credentials) and 'username' in credentials and \
'password' in credentials
if scoped:
valid = valid and 'tenant_name' in credentials
return valid
def _auth_client(self):
if self.client_type == 'tempest':
if self.interface == 'json':
@ -240,9 +237,9 @@ class KeystoneV2AuthProvider(KeystoneAuthProvider):
def _auth_params(self):
if self.client_type == 'tempest':
return dict(
user=self.credentials['username'],
password=self.credentials['password'],
tenant=self.credentials.get('tenant_name', None),
user=self.credentials.username,
password=self.credentials.password,
tenant=self.credentials.tenant_name,
auth_data=True)
else:
raise NotImplementedError
@ -303,12 +300,15 @@ class KeystoneV3AuthProvider(KeystoneAuthProvider):
EXPIRY_DATE_FORMAT = '%Y-%m-%dT%H:%M:%S.%fZ'
def _convert_credentials(self, credentials):
# For V3 do not convert as V3 Credentials are not defined yet
return credentials
@classmethod
def check_credentials(cls, credentials, scoped=True):
# tenant_name is optional if not scoped
valid = super(KeystoneV3AuthProvider, cls).check_credentials(
credentials) and 'username' in credentials and \
'password' in credentials and 'domain_name' in credentials
valid = 'username' in credentials and 'password' in credentials \
and 'domain_name' in credentials
if scoped:
valid = valid and 'tenant_name' in credentials
return valid
@ -327,7 +327,7 @@ class KeystoneV3AuthProvider(KeystoneAuthProvider):
return dict(
user=self.credentials['username'],
password=self.credentials['password'],
tenant=self.credentials.get('tenant_name', None),
tenant=self.credentials['tenant_name'],
domain=self.credentials['domain_name'],
auth_data=True)
else:
@ -398,3 +398,144 @@ class KeystoneV3AuthProvider(KeystoneAuthProvider):
self.EXPIRY_DATE_FORMAT)
return expiry - self.token_expiry_threshold <= \
datetime.datetime.utcnow()
def get_credentials(credential_type=None, **kwargs):
"""
Builds a credentials object based on the configured auth_version
:param credential_type (string): requests credentials from tempest
configuration file. Valid values are defined in
Credentials.TYPE.
:param kwargs (dict): take into account only if credential_type is
not specified or None. Dict of credential key/value pairs
Examples:
Returns credentials from the provided parameters:
>>> get_credentials(username='foo', password='bar')
Returns credentials from tempest configuration:
>>> get_credentials(credential_type='user')
"""
if CONF.identity.auth_version == 'v2':
credential_class = KeystoneV2Credentials
else:
raise exceptions.InvalidConfiguration('Unsupported auth version')
if credential_type is not None:
creds = credential_class.get_default(credential_type)
else:
creds = credential_class(**kwargs)
return creds
class Credentials(object):
"""
Set of credentials for accessing OpenStack services
ATTRIBUTES: list of valid class attributes representing credentials.
TYPES: types of credentials available in the configuration file.
For each key there's a tuple (section, prefix) to match the
configuration options.
"""
ATTRIBUTES = []
TYPES = {
'identity_admin': ('identity', 'admin'),
'compute_admin': ('compute_admin', None),
'user': ('identity', None),
'alt_user': ('identity', 'alt')
}
def __init__(self, **kwargs):
"""
Enforce the available attributes at init time (only).
Additional attributes can still be set afterwards if tests need
to do so.
"""
self._apply_credentials(kwargs)
def _apply_credentials(self, attr):
for key in attr.keys():
if key in self.ATTRIBUTES:
setattr(self, key, attr[key])
else:
raise exceptions.InvalidCredentials
def __str__(self):
"""
Represent only attributes included in self.ATTRIBUTES
"""
_repr = dict((k, getattr(self, k)) for k in self.ATTRIBUTES)
return str(_repr)
def __eq__(self, other):
"""
Credentials are equal if attributes in self.ATTRIBUTES are equal
"""
return str(self) == str(other)
def __getattr__(self, key):
# If an attribute is set, __getattr__ is not invoked
# If an attribute is not set, and it is a known one, return None
if key in self.ATTRIBUTES:
return None
else:
raise AttributeError
def __delitem__(self, key):
# For backwards compatibility, support dict behaviour
if key in self.ATTRIBUTES:
delattr(self, key)
else:
raise AttributeError
def get(self, item, default):
# In this patch act as dict for backward compatibility
try:
return getattr(self, item)
except AttributeError:
return default
@classmethod
def get_default(cls, credentials_type):
if credentials_type not in cls.TYPES:
raise exceptions.InvalidCredentials()
creds = cls._get_default(credentials_type)
if not creds.is_valid():
raise exceptions.InvalidConfiguration()
return creds
@classmethod
def _get_default(cls, credentials_type):
raise NotImplementedError
def is_valid(self):
raise NotImplementedError
class KeystoneV2Credentials(Credentials):
CONF_ATTRIBUTES = ['username', 'password', 'tenant_name']
ATTRIBUTES = ['user_id', 'tenant_id']
ATTRIBUTES.extend(CONF_ATTRIBUTES)
@classmethod
def _get_default(cls, credentials_type='user'):
params = {}
section, prefix = cls.TYPES[credentials_type]
for attr in cls.CONF_ATTRIBUTES:
_section = getattr(CONF, section)
if prefix is None:
params[attr] = getattr(_section, attr)
else:
params[attr] = getattr(_section, prefix + "_" + attr)
return KeystoneV2Credentials(**params)
def is_valid(self):
"""
Minimum set of valid credentials, are username and password.
Tenant is optional.
"""
return None not in (self.username, self.password)

View File

@ -13,6 +13,12 @@
# License for the specific language governing permissions and limitations
# under the License.
from tempest.tests import fake_credentials
def get_credentials(credential_type=None, fill_in=True, **kwargs):
return fake_credentials.FakeCredentials()
class FakeAuthProvider(object):

View File

@ -45,6 +45,16 @@ class ConfigFixture(conf_fixture.Config):
os.mkdir(str(os.environ.get('OS_TEST_LOCK_PATH')))
self.conf.set_default('lock_path',
str(os.environ.get('OS_TEST_LOCK_PATH')))
self.conf.set_default('auth_version', 'v2', group='identity')
for config_option in ['username', 'password', 'tenant_name']:
# Identity group items
for prefix in ['', 'alt_', 'admin_']:
self.conf.set_default(prefix + config_option,
'fake_' + config_option,
group='identity')
# Compute Admin group items
self.conf.set_default(config_option, 'fake_' + config_option,
group='compute-admin')
class FakePrivate(config.TempestConfigPrivate):

View File

@ -0,0 +1,33 @@
# Copyright 2014 Hewlett-Packard Development Company, L.P.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from tempest import auth
class FakeCredentials(auth.Credentials):
def is_valid(self):
return True
class FakeKeystoneV2Credentials(auth.KeystoneV2Credentials):
def __init__(self):
creds = dict(
username='fake_username',
password='fake_password',
tenant_name='fake_tenant_name'
)
super(FakeKeystoneV2Credentials, self).__init__(**creds)

View File

@ -22,18 +22,16 @@ from tempest import config
from tempest import exceptions
from tempest.openstack.common.fixture import mockpatch
from tempest.tests import base
from tempest.tests import fake_auth_provider
from tempest.tests import fake_config
from tempest.tests import fake_credentials
from tempest.tests import fake_http
from tempest.tests import fake_identity
class BaseAuthTestsSetUp(base.TestCase):
_auth_provider_class = None
credentials = {
'username': 'fake_user',
'password': 'fake_pwd',
'tenant_name': 'fake_tenant'
}
credentials = fake_credentials.FakeCredentials()
def _auth(self, credentials, **params):
"""
@ -47,6 +45,8 @@ class BaseAuthTestsSetUp(base.TestCase):
self.stubs.Set(config, 'TempestConfigPrivate', fake_config.FakePrivate)
self.fake_http = fake_http.fake_httplib2(return_type=200)
self.stubs.Set(http.ClosingHttp, 'request', self.fake_http.request)
self.stubs.Set(auth, 'get_credentials',
fake_auth_provider.get_credentials)
self.auth_provider = self._auth(self.credentials)
@ -58,12 +58,19 @@ class TestBaseAuthProvider(BaseAuthTestsSetUp):
"""
_auth_provider_class = auth.AuthProvider
def test_check_credentials_is_dict(self):
self.assertTrue(self.auth_provider.check_credentials({}))
def test_check_credentials_class(self):
self.assertRaises(NotImplementedError,
self.auth_provider.check_credentials,
auth.Credentials())
def test_check_credentials_bad_type(self):
self.assertFalse(self.auth_provider.check_credentials([]))
def test_instantiate_with_dict(self):
# Dict credentials are only supported for backward compatibility
auth_provider = self._auth(credentials={})
self.assertIsInstance(auth_provider.credentials, auth.Credentials)
def test_instantiate_with_bad_credentials_type(self):
"""
Assure that credentials with bad type fail with TypeError
@ -104,6 +111,7 @@ class TestBaseAuthProvider(BaseAuthTestsSetUp):
class TestKeystoneV2AuthProvider(BaseAuthTestsSetUp):
_endpoints = fake_identity.IDENTITY_V2_RESPONSE['access']['serviceCatalog']
_auth_provider_class = auth.KeystoneV2AuthProvider
credentials = fake_credentials.FakeKeystoneV2Credentials()
def setUp(self):
super(TestKeystoneV2AuthProvider, self).setUp()
@ -210,17 +218,6 @@ class TestKeystoneV2AuthProvider(BaseAuthTestsSetUp):
del cred[attr]
self.assertFalse(self.auth_provider.check_credentials(cred))
def test_check_credentials_not_scoped_missing_tenant_name(self):
cred = copy.copy(self.credentials)
del cred['tenant_name']
self.assertTrue(self.auth_provider.check_credentials(cred,
scoped=False))
def test_check_credentials_missing_tenant_name(self):
cred = copy.copy(self.credentials)
del cred['tenant_name']
self.assertFalse(self.auth_provider.check_credentials(cred))
def _test_base_url_helper(self, expected_url, filters,
auth_data=None):

View File

@ -0,0 +1,112 @@
# Copyright 2014 Hewlett-Packard Development Company, L.P.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from tempest import auth
from tempest.common import http
from tempest.common import tempest_fixtures as fixtures
from tempest import config
from tempest import exceptions
from tempest.tests import base
from tempest.tests import fake_config
from tempest.tests import fake_http
from tempest.tests import fake_identity
class CredentialsTests(base.TestCase):
attributes = {}
credentials_class = auth.Credentials
def _get_credentials(self, attributes=None):
if attributes is None:
attributes = self.attributes
return self.credentials_class(**attributes)
def setUp(self):
super(CredentialsTests, self).setUp()
self.fake_http = fake_http.fake_httplib2(return_type=200)
self.stubs.Set(http.ClosingHttp, 'request', self.fake_http.request)
self.useFixture(fake_config.ConfigFixture())
self.stubs.Set(config, 'TempestConfigPrivate', fake_config.FakePrivate)
def test_create_invalid_attr(self):
self.assertRaises(exceptions.InvalidCredentials,
self._get_credentials,
attributes=dict(invalid='fake'))
def test_default(self):
self.useFixture(fixtures.LockFixture('auth_version'))
for ctype in self.credentials_class.TYPES:
self.assertRaises(NotImplementedError,
self.credentials_class.get_default,
credentials_type=ctype)
def test_invalid_default(self):
self.assertRaises(exceptions.InvalidCredentials,
auth.Credentials.get_default,
credentials_type='invalid_type')
def test_is_valid(self):
creds = self._get_credentials()
self.assertRaises(NotImplementedError, creds.is_valid)
class KeystoneV2CredentialsTests(CredentialsTests):
attributes = {
'username': 'fake_username',
'password': 'fake_password',
'tenant_name': 'fake_tenant_name'
}
identity_response = fake_identity._fake_v2_response
credentials_class = auth.KeystoneV2Credentials
def setUp(self):
super(KeystoneV2CredentialsTests, self).setUp()
self.stubs.Set(http.ClosingHttp, 'request', self.identity_response)
self.stubs.Set(config, 'TempestConfigPrivate', fake_config.FakePrivate)
def _verify_credentials(self, credentials_class, creds_dict):
creds = auth.get_credentials(**creds_dict)
# Check the right version of credentials has been returned
self.assertIsInstance(creds, credentials_class)
# Check the id attributes are filled in
attributes = [x for x in creds.ATTRIBUTES if (
'_id' in x and x != 'domain_id')]
for attr in attributes:
self.assertIsNone(getattr(creds, attr))
def test_get_credentials(self):
self.useFixture(fixtures.LockFixture('auth_version'))
self._verify_credentials(self.credentials_class, self.attributes)
def test_is_valid(self):
creds = self._get_credentials()
self.assertTrue(creds.is_valid())
def test_is_not_valid(self):
creds = self._get_credentials()
for attr in self.attributes.keys():
delattr(creds, attr)
self.assertFalse(creds.is_valid(),
"Credentials should be invalid without %s" % attr)
def test_default(self):
self.useFixture(fixtures.LockFixture('auth_version'))
for ctype in self.credentials_class.TYPES:
creds = self.credentials_class.get_default(credentials_type=ctype)
for attr in self.attributes.keys():
# Default configuration values related to credentials
# are defined as fake_* in fake_config.py
self.assertEqual(getattr(creds, attr), 'fake_' + attr)