Merge "Add support for roles to credentials providers"
This commit is contained in:
commit
7456ac6ef9
@ -9,3 +9,27 @@
|
||||
- username: 'user_2'
|
||||
tenant_name: 'test_tenant_2'
|
||||
password: 'test_password'
|
||||
|
||||
# To specify which roles a user has list them under the roles field
|
||||
- username: 'multi_role_user'
|
||||
tenant_name: 'test_tenant_42'
|
||||
password: 'test_password'
|
||||
roles:
|
||||
- 'fun_role'
|
||||
- 'not_an_admin'
|
||||
- 'an_admin'
|
||||
|
||||
# To specify a user has a role specified in the config file you can use the
|
||||
# type field to specify it, valid values are admin, operator, and reseller_admin
|
||||
- username: 'swift_pseudo_admin_user_1'
|
||||
tenant_name: 'admin_tenant_1'
|
||||
password: 'test_password'
|
||||
types:
|
||||
- 'reseller_admin'
|
||||
- 'operator'
|
||||
|
||||
- username: 'admin_user_1'
|
||||
tenant_name: 'admin_tenant_1'
|
||||
password: 'test_password'
|
||||
types:
|
||||
- 'admin'
|
||||
|
@ -48,13 +48,47 @@ class Accounts(cred_provider.CredentialProvider):
|
||||
self.accounts_dir = os.path.join(CONF.lock_path, 'test_accounts')
|
||||
self.isolated_creds = {}
|
||||
|
||||
@classmethod
|
||||
def _append_role(cls, role, account_hash, hash_dict):
|
||||
if role in hash_dict['roles']:
|
||||
hash_dict['roles'][role].append(account_hash)
|
||||
else:
|
||||
hash_dict['roles'][role] = [account_hash]
|
||||
return hash_dict
|
||||
|
||||
@classmethod
|
||||
def get_hash_dict(cls, accounts):
|
||||
hash_dict = {}
|
||||
hash_dict = {'roles': {}, 'creds': {}}
|
||||
# Loop over the accounts read from the yaml file
|
||||
for account in accounts:
|
||||
roles = []
|
||||
types = []
|
||||
if 'roles' in account:
|
||||
roles = account.pop('roles')
|
||||
if 'types' in account:
|
||||
types = account.pop('types')
|
||||
temp_hash = hashlib.md5()
|
||||
temp_hash.update(str(account))
|
||||
hash_dict[temp_hash.hexdigest()] = account
|
||||
temp_hash_key = temp_hash.hexdigest()
|
||||
hash_dict['creds'][temp_hash_key] = account
|
||||
for role in roles:
|
||||
hash_dict = cls._append_role(role, temp_hash_key,
|
||||
hash_dict)
|
||||
# If types are set for the account append the matching role
|
||||
# subdict with the hash
|
||||
for type in types:
|
||||
if type == 'admin':
|
||||
hash_dict = cls._append_role(CONF.identity.admin_role,
|
||||
temp_hash_key, hash_dict)
|
||||
elif type == 'operator':
|
||||
hash_dict = cls._append_role(
|
||||
CONF.object_storage.operator_role, temp_hash_key,
|
||||
hash_dict)
|
||||
elif type == 'reseller_admin':
|
||||
hash_dict = cls._append_role(
|
||||
CONF.object_storage.reseller_admin_role,
|
||||
temp_hash_key,
|
||||
hash_dict)
|
||||
return hash_dict
|
||||
|
||||
def is_multi_user(self):
|
||||
@ -63,7 +97,7 @@ class Accounts(cred_provider.CredentialProvider):
|
||||
raise exceptions.InvalidConfiguration(
|
||||
"Account file %s doesn't exist" % CONF.auth.test_accounts_file)
|
||||
else:
|
||||
return len(self.hash_dict) > 1
|
||||
return len(self.hash_dict['creds']) > 1
|
||||
|
||||
def is_multi_tenant(self):
|
||||
return self.is_multi_user()
|
||||
@ -78,6 +112,8 @@ class Accounts(cred_provider.CredentialProvider):
|
||||
|
||||
@lockutils.synchronized('test_accounts_io', external=True)
|
||||
def _get_free_hash(self, hashes):
|
||||
# Cast as a list because in some edge cases a set will be passed in
|
||||
hashes = list(hashes)
|
||||
if not os.path.isdir(self.accounts_dir):
|
||||
os.mkdir(self.accounts_dir)
|
||||
# Create File from first hash (since none are in use)
|
||||
@ -97,12 +133,46 @@ class Accounts(cred_provider.CredentialProvider):
|
||||
'the credentials for this allocation request' % ','.join(names))
|
||||
raise exceptions.InvalidConfiguration(msg)
|
||||
|
||||
def _get_creds(self):
|
||||
def _get_match_hash_list(self, roles=None):
|
||||
hashes = []
|
||||
if roles:
|
||||
# Loop over all the creds for each role in the subdict and generate
|
||||
# a list of cred lists for each role
|
||||
for role in roles:
|
||||
temp_hashes = self.hash_dict['roles'].get(role, None)
|
||||
if not temp_hashes:
|
||||
raise exceptions.InvalidConfiguration(
|
||||
"No credentials with role: %s specified in the "
|
||||
"accounts ""file" % role)
|
||||
hashes.append(temp_hashes)
|
||||
# Take the list of lists and do a boolean and between each list to
|
||||
# find the creds which fall under all the specified roles
|
||||
temp_list = set(hashes[0])
|
||||
for hash_list in hashes[1:]:
|
||||
temp_list = temp_list & set(hash_list)
|
||||
hashes = temp_list
|
||||
else:
|
||||
hashes = self.hash_dict['creds'].keys()
|
||||
# NOTE(mtreinish): admin is a special case because of the increased
|
||||
# privlege set which could potentially cause issues on tests where that
|
||||
# is not expected. So unless the admin role isn't specified do not
|
||||
# allocate admin.
|
||||
admin_hashes = self.hash_dict['roles'].get(CONF.identity.admin_role,
|
||||
None)
|
||||
if ((not roles or CONF.identity.admin_role not in roles) and
|
||||
admin_hashes):
|
||||
useable_hashes = [x for x in hashes if x not in admin_hashes]
|
||||
else:
|
||||
useable_hashes = hashes
|
||||
return useable_hashes
|
||||
|
||||
def _get_creds(self, roles=None):
|
||||
if self.use_default_creds:
|
||||
raise exceptions.InvalidConfiguration(
|
||||
"Account file %s doesn't exist" % CONF.auth.test_accounts_file)
|
||||
free_hash = self._get_free_hash(self.hash_dict.keys())
|
||||
return self.hash_dict[free_hash]
|
||||
useable_hashes = self._get_match_hash_list(roles)
|
||||
free_hash = self._get_free_hash(useable_hashes)
|
||||
return self.hash_dict['creds'][free_hash]
|
||||
|
||||
@lockutils.synchronized('test_accounts_io', external=True)
|
||||
def remove_hash(self, hash_string):
|
||||
@ -116,10 +186,10 @@ class Accounts(cred_provider.CredentialProvider):
|
||||
os.rmdir(self.accounts_dir)
|
||||
|
||||
def get_hash(self, creds):
|
||||
for _hash in self.hash_dict:
|
||||
# Comparing on the attributes that were read from the YAML
|
||||
if all([getattr(creds, k) == self.hash_dict[_hash][k] for k in
|
||||
creds.get_init_attributes()]):
|
||||
for _hash in self.hash_dict['creds']:
|
||||
# Comparing on the attributes that are expected in the YAML
|
||||
if all([getattr(creds, k) == self.hash_dict['creds'][_hash][k] for
|
||||
k in creds.get_init_attributes()]):
|
||||
return _hash
|
||||
raise AttributeError('Invalid credentials %s' % creds)
|
||||
|
||||
@ -143,14 +213,33 @@ class Accounts(cred_provider.CredentialProvider):
|
||||
self.isolated_creds['alt'] = alt_credential
|
||||
return alt_credential
|
||||
|
||||
def get_creds_by_roles(self, roles, force_new=False):
|
||||
roles = list(set(roles))
|
||||
exist_creds = self.isolated_creds.get(str(roles), None)
|
||||
# The force kwarg is used to allocate an additional set of creds with
|
||||
# the same role list. The index used for the previously allocation
|
||||
# in the isolated_creds dict will be moved.
|
||||
if exist_creds and not force_new:
|
||||
return exist_creds
|
||||
elif exist_creds and force_new:
|
||||
new_index = str(roles) + '-' + str(len(self.isolated_creds))
|
||||
self.isolated_creds[new_index] = exist_creds
|
||||
creds = self._get_creds(roles=roles)
|
||||
role_credential = cred_provider.get_credentials(**creds)
|
||||
self.isolated_creds[str(roles)] = role_credential
|
||||
return role_credential
|
||||
|
||||
def clear_isolated_creds(self):
|
||||
for creds in self.isolated_creds.values():
|
||||
self.remove_credentials(creds)
|
||||
|
||||
def get_admin_creds(self):
|
||||
msg = ('If admin credentials are available tenant_isolation should be'
|
||||
' used instead')
|
||||
raise NotImplementedError(msg)
|
||||
return self.get_creds_by_roles([CONF.identity.admin_role])
|
||||
|
||||
def admin_available(self):
|
||||
if not self.hash_dict['roles'].get(CONF.identity.admin_role):
|
||||
return False
|
||||
return True
|
||||
|
||||
|
||||
class NotLockingAccounts(Accounts):
|
||||
@ -173,7 +262,7 @@ class NotLockingAccounts(Accounts):
|
||||
raise exceptions.InvalidConfiguration(msg)
|
||||
else:
|
||||
# TODO(andreaf) Add a uniqueness check here
|
||||
return len(self.hash_dict) > 1
|
||||
return len(self.hash_dict['creds']) > 1
|
||||
|
||||
def is_multi_user(self):
|
||||
return self._unique_creds('username')
|
||||
@ -181,16 +270,17 @@ class NotLockingAccounts(Accounts):
|
||||
def is_multi_tenant(self):
|
||||
return self._unique_creds('tenant_id')
|
||||
|
||||
def get_creds(self, id):
|
||||
def get_creds(self, id, roles=None):
|
||||
try:
|
||||
hashes = self._get_match_hash_list(roles)
|
||||
# No need to sort the dict as within the same python process
|
||||
# the HASH seed won't change, so subsequent calls to keys()
|
||||
# will return the same result
|
||||
_hash = self.hash_dict.keys()[id]
|
||||
_hash = hashes[id]
|
||||
except IndexError:
|
||||
msg = 'Insufficient number of users provided'
|
||||
raise exceptions.InvalidConfiguration(msg)
|
||||
return self.hash_dict[_hash]
|
||||
return self.hash_dict['creds'][_hash]
|
||||
|
||||
def get_primary_creds(self):
|
||||
if self.isolated_creds.get('primary'):
|
||||
@ -220,5 +310,35 @@ class NotLockingAccounts(Accounts):
|
||||
self.isolated_creds = {}
|
||||
|
||||
def get_admin_creds(self):
|
||||
return cred_provider.get_configured_credentials(
|
||||
"identity_admin", fill_in=False)
|
||||
if not self.use_default_creds:
|
||||
return self.get_creds_by_roles([CONF.identity.admin_role])
|
||||
else:
|
||||
creds = cred_provider.get_configured_credentials(
|
||||
"identity_admin", fill_in=False)
|
||||
self.isolated_creds['admin'] = creds
|
||||
return creds
|
||||
|
||||
def get_creds_by_roles(self, roles, force_new=False):
|
||||
roles = list(set(roles))
|
||||
exist_creds = self.isolated_creds.get(str(roles), None)
|
||||
index = 0
|
||||
if exist_creds and not force_new:
|
||||
return exist_creds
|
||||
elif exist_creds and force_new:
|
||||
new_index = str(roles) + '-' + str(len(self.isolated_creds))
|
||||
self.isolated_creds[new_index] = exist_creds
|
||||
# Figure out how many existing creds for this roles set are present
|
||||
# use this as the index the returning hash list to ensure separate
|
||||
# creds are returned with force_new being True
|
||||
for creds_names in self.isolated_creds:
|
||||
if str(roles) in creds_names:
|
||||
index = index + 1
|
||||
if not self.use_default_creds:
|
||||
creds = self.get_creds(index, roles=roles)
|
||||
role_credential = cred_provider.get_credentials(**creds)
|
||||
self.isolated_creds[str(roles)] = role_credential
|
||||
else:
|
||||
msg = "Default credentials can not be used with specifying "\
|
||||
"credentials by roles"
|
||||
raise exceptions.InvalidConfiguration(msg)
|
||||
return role_credential
|
||||
|
@ -113,3 +113,7 @@ class CredentialProvider(object):
|
||||
@abc.abstractmethod
|
||||
def is_multi_tenant(self):
|
||||
return
|
||||
|
||||
@abc.abstractmethod
|
||||
def get_creds_by_roles(self, roles, force_new=False):
|
||||
return
|
||||
|
@ -11,6 +11,8 @@
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
import os
|
||||
|
||||
from tempest.common import accounts
|
||||
from tempest.common import cred_provider
|
||||
from tempest.common import isolated_creds
|
||||
@ -46,24 +48,17 @@ def get_isolated_credentials(name, network_resources=None,
|
||||
# creds area vailable.
|
||||
def is_admin_available():
|
||||
is_admin = True
|
||||
# In the case of a pre-provisioned account, if even if creds were
|
||||
# configured, the admin credentials won't be available
|
||||
if (CONF.auth.locking_credentials_provider and
|
||||
not CONF.auth.allow_tenant_isolation):
|
||||
is_admin = False
|
||||
# If tenant isolation is enabled admin will be available
|
||||
if CONF.auth.allow_tenant_isolation:
|
||||
return is_admin
|
||||
# Check whether test accounts file has the admin specified or not
|
||||
elif os.path.isfile(CONF.auth.test_accounts_file):
|
||||
check_accounts = accounts.Accounts(name='check_admin')
|
||||
if not check_accounts.admin_available():
|
||||
is_admin = False
|
||||
else:
|
||||
try:
|
||||
cred_provider.get_configured_credentials('identity_admin')
|
||||
# NOTE(mtreinish) This should never be caught because of the if above.
|
||||
# NotImplementedError is only raised if admin credentials are requested
|
||||
# and the locking test accounts cred provider is being used.
|
||||
except NotImplementedError:
|
||||
is_admin = False
|
||||
# NOTE(mtreinish): This will be raised by the non-locking accounts
|
||||
# provider if there aren't admin credentials provided in the config
|
||||
# file. This exception originates from the auth call to get configured
|
||||
# credentials
|
||||
except exceptions.InvalidConfiguration:
|
||||
is_admin = False
|
||||
|
||||
return is_admin
|
||||
|
@ -90,7 +90,7 @@ class IsolatedCreds(cred_provider.CredentialProvider):
|
||||
self._cleanup_default_secgroup(tenant)
|
||||
self.identity_admin_client.delete_tenant(tenant)
|
||||
|
||||
def _create_creds(self, suffix="", admin=False):
|
||||
def _create_creds(self, suffix="", admin=False, roles=None):
|
||||
"""Create random credentials under the following schema.
|
||||
|
||||
If the name contains a '.' is the full class path of something, and
|
||||
@ -121,8 +121,13 @@ class IsolatedCreds(cred_provider.CredentialProvider):
|
||||
self._assign_user_role(tenant, user, swift_operator_role)
|
||||
if admin:
|
||||
self._assign_user_role(tenant, user, CONF.identity.admin_role)
|
||||
for role in CONF.auth.tempest_roles:
|
||||
self._assign_user_role(tenant, user, role)
|
||||
# Add roles specified in config file
|
||||
for conf_role in CONF.auth.tempest_roles:
|
||||
self._assign_user_role(tenant, user, conf_role)
|
||||
# Add roles requested by caller
|
||||
if roles:
|
||||
for role in roles:
|
||||
self._assign_user_role(tenant, user, role)
|
||||
return self._get_credentials(user, tenant)
|
||||
|
||||
def _get_credentials(self, user, tenant):
|
||||
@ -247,12 +252,15 @@ class IsolatedCreds(cred_provider.CredentialProvider):
|
||||
return self.isolated_net_resources.get('alt')[2]
|
||||
|
||||
def get_credentials(self, credential_type):
|
||||
if self.isolated_creds.get(credential_type):
|
||||
credentials = self.isolated_creds[credential_type]
|
||||
if self.isolated_creds.get(str(credential_type)):
|
||||
credentials = self.isolated_creds[str(credential_type)]
|
||||
else:
|
||||
is_admin = (credential_type == 'admin')
|
||||
credentials = self._create_creds(admin=is_admin)
|
||||
self.isolated_creds[credential_type] = credentials
|
||||
if credential_type in ['primary', 'alt', 'admin']:
|
||||
is_admin = (credential_type == 'admin')
|
||||
credentials = self._create_creds(admin=is_admin)
|
||||
else:
|
||||
credentials = self._create_creds(roles=credential_type)
|
||||
self.isolated_creds[str(credential_type)] = credentials
|
||||
# Maintained until tests are ported
|
||||
LOG.info("Acquired isolated creds:\n credentials: %s"
|
||||
% credentials)
|
||||
@ -260,7 +268,7 @@ class IsolatedCreds(cred_provider.CredentialProvider):
|
||||
not CONF.baremetal.driver_enabled):
|
||||
network, subnet, router = self._create_network_resources(
|
||||
credentials.tenant_id)
|
||||
self.isolated_net_resources[credential_type] = (
|
||||
self.isolated_net_resources[str(credential_type)] = (
|
||||
network, subnet, router,)
|
||||
LOG.info("Created isolated network resources for : \n"
|
||||
+ " credentials: %s" % credentials)
|
||||
@ -275,6 +283,26 @@ class IsolatedCreds(cred_provider.CredentialProvider):
|
||||
def get_alt_creds(self):
|
||||
return self.get_credentials('alt')
|
||||
|
||||
def get_creds_by_roles(self, roles, force_new=False):
|
||||
roles = list(set(roles))
|
||||
# The roles list as a str will become the index as the dict key for
|
||||
# the created credentials set in the isolated_creds dict.
|
||||
exist_creds = self.isolated_creds.get(str(roles))
|
||||
# If force_new flag is True 2 cred sets with the same roles are needed
|
||||
# handle this by creating a separate index for old one to store it
|
||||
# separately for cleanup
|
||||
if exist_creds and force_new:
|
||||
new_index = str(roles) + '-' + str(len(self.isolated_creds))
|
||||
self.isolated_creds[new_index] = exist_creds
|
||||
del self.isolated_creds[str(roles)]
|
||||
# Handle isolated neutron resouces if they exist too
|
||||
if CONF.service_available.neutron:
|
||||
exist_net = self.isolated_net_resources.get(str(roles))
|
||||
if exist_net:
|
||||
self.isolated_net_resources[new_index] = exist_net
|
||||
del self.isolated_net_resources[str(roles)]
|
||||
return self.get_credentials(roles)
|
||||
|
||||
def _clear_isolated_router(self, router_id, router_name):
|
||||
net_client = self.network_admin_client
|
||||
try:
|
||||
|
@ -51,7 +51,19 @@ class TestAccount(base.TestCase):
|
||||
{'username': 'test_user5', 'tenant_name': 'test_tenant5',
|
||||
'password': 'p'},
|
||||
{'username': 'test_user6', 'tenant_name': 'test_tenant6',
|
||||
'password': 'p'},
|
||||
'password': 'p', 'roles': ['role1', 'role2']},
|
||||
{'username': 'test_user7', 'tenant_name': 'test_tenant7',
|
||||
'password': 'p', 'roles': ['role2', 'role3']},
|
||||
{'username': 'test_user8', 'tenant_name': 'test_tenant8',
|
||||
'password': 'p', 'roles': ['role4', 'role1']},
|
||||
{'username': 'test_user9', 'tenant_name': 'test_tenant9',
|
||||
'password': 'p', 'roles': ['role1', 'role2', 'role3', 'role4']},
|
||||
{'username': 'test_user10', 'tenant_name': 'test_tenant10',
|
||||
'password': 'p', 'roles': ['role1', 'role2', 'role3', 'role4']},
|
||||
{'username': 'test_user11', 'tenant_name': 'test_tenant11',
|
||||
'password': 'p', 'roles': [cfg.CONF.identity.admin_role]},
|
||||
{'username': 'test_user12', 'tenant_name': 'test_tenant12',
|
||||
'password': 'p', 'roles': [cfg.CONF.identity.admin_role]},
|
||||
]
|
||||
self.useFixture(mockpatch.Patch(
|
||||
'tempest.common.accounts.read_accounts_yaml',
|
||||
@ -64,7 +76,8 @@ class TestAccount(base.TestCase):
|
||||
for account in accounts_list:
|
||||
hash = hashlib.md5()
|
||||
hash.update(str(account))
|
||||
hash_list.append(hash.hexdigest())
|
||||
temp_hash = hash.hexdigest()
|
||||
hash_list.append(temp_hash)
|
||||
return hash_list
|
||||
|
||||
def test_get_hash(self):
|
||||
@ -83,8 +96,8 @@ class TestAccount(base.TestCase):
|
||||
hash_dict = test_account_class.get_hash_dict(self.test_accounts)
|
||||
hash_list = self._get_hash_list(self.test_accounts)
|
||||
for hash in hash_list:
|
||||
self.assertIn(hash, hash_dict.keys())
|
||||
self.assertIn(hash_dict[hash], self.test_accounts)
|
||||
self.assertIn(hash, hash_dict['creds'].keys())
|
||||
self.assertIn(hash_dict['creds'][hash], self.test_accounts)
|
||||
|
||||
def test_create_hash_file_previous_file(self):
|
||||
# Emulate the lock existing on the filesystem
|
||||
@ -201,6 +214,62 @@ class TestAccount(base.TestCase):
|
||||
test_accounts_class = accounts.Accounts('test_name')
|
||||
self.assertFalse(test_accounts_class.is_multi_user())
|
||||
|
||||
def test__get_creds_by_roles_one_role(self):
|
||||
self.useFixture(mockpatch.Patch(
|
||||
'tempest.common.accounts.read_accounts_yaml',
|
||||
return_value=self.test_accounts))
|
||||
test_accounts_class = accounts.Accounts('test_name')
|
||||
hashes = test_accounts_class.hash_dict['roles']['role4']
|
||||
temp_hash = hashes[0]
|
||||
get_free_hash_mock = self.useFixture(mockpatch.PatchObject(
|
||||
test_accounts_class, '_get_free_hash', return_value=temp_hash))
|
||||
# Test a single role returns all matching roles
|
||||
test_accounts_class._get_creds(roles=['role4'])
|
||||
calls = get_free_hash_mock.mock.mock_calls
|
||||
self.assertEqual(len(calls), 1)
|
||||
args = calls[0][1][0]
|
||||
for i in hashes:
|
||||
self.assertIn(i, args)
|
||||
|
||||
def test__get_creds_by_roles_list_role(self):
|
||||
self.useFixture(mockpatch.Patch(
|
||||
'tempest.common.accounts.read_accounts_yaml',
|
||||
return_value=self.test_accounts))
|
||||
test_accounts_class = accounts.Accounts('test_name')
|
||||
hashes = test_accounts_class.hash_dict['roles']['role4']
|
||||
hashes2 = test_accounts_class.hash_dict['roles']['role2']
|
||||
hashes = list(set(hashes) & set(hashes2))
|
||||
temp_hash = hashes[0]
|
||||
get_free_hash_mock = self.useFixture(mockpatch.PatchObject(
|
||||
test_accounts_class, '_get_free_hash', return_value=temp_hash))
|
||||
# Test an intersection of multiple roles
|
||||
test_accounts_class._get_creds(roles=['role2', 'role4'])
|
||||
calls = get_free_hash_mock.mock.mock_calls
|
||||
self.assertEqual(len(calls), 1)
|
||||
args = calls[0][1][0]
|
||||
for i in hashes:
|
||||
self.assertIn(i, args)
|
||||
|
||||
def test__get_creds_by_roles_no_admin(self):
|
||||
self.useFixture(mockpatch.Patch(
|
||||
'tempest.common.accounts.read_accounts_yaml',
|
||||
return_value=self.test_accounts))
|
||||
test_accounts_class = accounts.Accounts('test_name')
|
||||
hashes = test_accounts_class.hash_dict['creds'].keys()
|
||||
admin_hashes = test_accounts_class.hash_dict['roles'][
|
||||
cfg.CONF.identity.admin_role]
|
||||
temp_hash = hashes[0]
|
||||
get_free_hash_mock = self.useFixture(mockpatch.PatchObject(
|
||||
test_accounts_class, '_get_free_hash', return_value=temp_hash))
|
||||
# Test an intersection of multiple roles
|
||||
test_accounts_class._get_creds()
|
||||
calls = get_free_hash_mock.mock.mock_calls
|
||||
self.assertEqual(len(calls), 1)
|
||||
args = calls[0][1][0]
|
||||
self.assertEqual(len(args), 10)
|
||||
for i in admin_hashes:
|
||||
self.assertNotIn(i, args)
|
||||
|
||||
|
||||
class TestNotLockingAccount(base.TestCase):
|
||||
|
||||
|
@ -75,6 +75,17 @@ class TestTenantIsolation(base.TestCase):
|
||||
{'id': '1', 'name': 'FakeRole'}]))))
|
||||
return roles_fix
|
||||
|
||||
def _mock_list_2_roles(self):
|
||||
roles_fix = self.useFixture(mockpatch.PatchObject(
|
||||
json_iden_client.IdentityClientJSON,
|
||||
'list_roles',
|
||||
return_value=(service_client.ResponseBodyList
|
||||
(200,
|
||||
[{'id': '1234', 'name': 'role1'},
|
||||
{'id': '1', 'name': 'FakeRole'},
|
||||
{'id': '12345', 'name': 'role2'}]))))
|
||||
return roles_fix
|
||||
|
||||
def _mock_assign_user_role(self):
|
||||
tenant_fix = self.useFixture(mockpatch.PatchObject(
|
||||
json_iden_client.IdentityClientJSON,
|
||||
@ -153,6 +164,35 @@ class TestTenantIsolation(base.TestCase):
|
||||
self.assertEqual(admin_creds.tenant_id, '1234')
|
||||
self.assertEqual(admin_creds.user_id, '1234')
|
||||
|
||||
@mock.patch('tempest_lib.common.rest_client.RestClient')
|
||||
def test_role_creds(self, MockRestClient):
|
||||
cfg.CONF.set_default('neutron', False, 'service_available')
|
||||
iso_creds = isolated_creds.IsolatedCreds('test class',
|
||||
password='fake_password')
|
||||
self._mock_list_2_roles()
|
||||
self._mock_user_create('1234', 'fake_role_user')
|
||||
self._mock_tenant_create('1234', 'fake_role_tenant')
|
||||
|
||||
user_mock = mock.patch.object(json_iden_client.IdentityClientJSON,
|
||||
'assign_user_role')
|
||||
user_mock.start()
|
||||
self.addCleanup(user_mock.stop)
|
||||
with mock.patch.object(json_iden_client.IdentityClientJSON,
|
||||
'assign_user_role') as user_mock:
|
||||
role_creds = iso_creds.get_creds_by_roles(roles=['role1', 'role2'])
|
||||
calls = user_mock.mock_calls
|
||||
# Assert that the role creation is called with the 2 specified roles
|
||||
self.assertEqual(len(calls), 3)
|
||||
args = map(lambda x: x[1], calls)
|
||||
self.assertIn(('1234', '1234', '1'), args)
|
||||
self.assertIn(('1234', '1234', '1234'), args)
|
||||
self.assertIn(('1234', '1234', '12345'), args)
|
||||
self.assertEqual(role_creds.username, 'fake_role_user')
|
||||
self.assertEqual(role_creds.tenant_name, 'fake_role_tenant')
|
||||
# Verify IDs
|
||||
self.assertEqual(role_creds.tenant_id, '1234')
|
||||
self.assertEqual(role_creds.user_id, '1234')
|
||||
|
||||
@mock.patch('tempest_lib.common.rest_client.RestClient')
|
||||
def test_all_cred_cleanup(self, MockRestClient):
|
||||
cfg.CONF.set_default('neutron', False, 'service_available')
|
||||
|
Loading…
x
Reference in New Issue
Block a user