Support scope in dynamic cred for specific roles
We already added scope support for 'admin', 'member', and 'reader' role. This commit is to adds the scope support for specific roles, basically in get_creds_by_roles(). Test can now request the scope along with the number of roles using the scope as prefix in credential type. Fpr example: credentials = [['my_role', 'role1'], # this will be old style and project scoped ['project_my_role', 'role1'], # this will be project scoped ['domain_my_role', 'role1'], # this will be domain scoped ['system_my_role', 'role1']] # this will be system scoped and below is how test can access the credential manager of respective credentials type: cls.os_my_role.any_client cls.os_project_my_role.any_client cls.os_domain_my_role.any_client cls.os_system_my_role.any_client Closes-Bug: #1917168 Change-Id: I9053faa255e3680d7f870e3cdedf62fb2eb5cb1a
This commit is contained in:
parent
ef13f406b9
commit
2d0da049d0
@ -0,0 +1,36 @@
|
||||
---
|
||||
features:
|
||||
- |
|
||||
Dynamic credentials now support the scope type for specific roles
|
||||
too along with ``admin``, ``member``, ``reader`` role.
|
||||
Test can specify the scope in the prefix of ``cls.credentials`` name.
|
||||
If ``system`` is prefix in ``cls.credentials`` name then creds will
|
||||
be created with scope as ``system``. If ``domain`` is prefix in
|
||||
``cls.credentials`` name then creds will be created with scope as
|
||||
``domain`` otherwise default ``project`` scope will be used.
|
||||
For Example::
|
||||
|
||||
credentials = [['my_role', 'role1'], # this will be old style and project scoped
|
||||
['project_my_role', 'role1'], # this will be project scoped
|
||||
['domain_my_role', 'role1'], # this will be domain scoped
|
||||
['system_my_role', 'role1']] # this will be system scoped
|
||||
|
||||
And below is how test can access the credential manager of respective
|
||||
credentials type::
|
||||
|
||||
cls.os_my_role.any_client
|
||||
cls.os_project_my_role.any_client
|
||||
cls.os_domain_my_role.any_client
|
||||
cls.os_system_my_role.any_client
|
||||
|
||||
|
||||
For backward compatibility, we set the credentials manager class attribute
|
||||
in old style form too which is prefix with ``os_roles_*``, example
|
||||
``cls.os_roles_my_role`` but we recommend to use the new style attribute
|
||||
as shown above.
|
||||
issues:
|
||||
- |
|
||||
Scope support for specific role is not yet added for pre-provisioned credentials.
|
||||
fixes:
|
||||
- |
|
||||
Fixes the `bug# 1917168 <https://bugs.launchpad.net/tempest/+bug/1917168>`_
|
@ -118,7 +118,7 @@ class CredentialProvider(object, metaclass=abc.ABCMeta):
|
||||
return
|
||||
|
||||
@abc.abstractmethod
|
||||
def get_creds_by_roles(self, roles, force_new=False):
|
||||
def get_creds_by_roles(self, roles, force_new=False, scope=None):
|
||||
return
|
||||
|
||||
@abc.abstractmethod
|
||||
|
@ -376,21 +376,25 @@ class DynamicCredentialProvider(cred_provider.CredentialProvider):
|
||||
def get_credentials(self, credential_type, scope=None):
|
||||
if not scope and self._creds.get(str(credential_type)):
|
||||
credentials = self._creds[str(credential_type)]
|
||||
elif scope and self._creds.get("%s_%s" % (scope, credential_type[0])):
|
||||
credentials = self._creds["%s_%s" % (scope, credential_type[0])]
|
||||
elif scope and (
|
||||
self._creds.get("%s_%s" % (scope, str(credential_type)))):
|
||||
credentials = self._creds["%s_%s" % (scope, str(credential_type))]
|
||||
else:
|
||||
LOG.debug("Creating new dynamic creds for scope: %s and "
|
||||
"credential_type: %s", scope, credential_type)
|
||||
if scope:
|
||||
if credential_type in [['admin'], ['alt_admin']]:
|
||||
credentials = self._create_creds(
|
||||
admin=True, scope=scope)
|
||||
else:
|
||||
cred_type = credential_type
|
||||
if credential_type in [['alt_member'], ['alt_reader']]:
|
||||
cred_type = credential_type[0][4:]
|
||||
elif credential_type in [['alt_member'], ['alt_reader']]:
|
||||
cred_type = credential_type[0][4:]
|
||||
if isinstance(cred_type, str):
|
||||
cred_type = [cred_type]
|
||||
credentials = self._create_creds(
|
||||
roles=cred_type, scope=scope)
|
||||
else:
|
||||
credentials = self._create_creds(
|
||||
roles=credential_type, scope=scope)
|
||||
elif credential_type in ['primary', 'alt', 'admin']:
|
||||
is_admin = (credential_type == 'admin')
|
||||
credentials = self._create_creds(admin=is_admin)
|
||||
@ -398,7 +402,7 @@ class DynamicCredentialProvider(cred_provider.CredentialProvider):
|
||||
credentials = self._create_creds(roles=credential_type)
|
||||
if scope:
|
||||
self._creds["%s_%s" %
|
||||
(scope, credential_type[0])] = credentials
|
||||
(scope, str(credential_type))] = credentials
|
||||
else:
|
||||
self._creds[str(credential_type)] = credentials
|
||||
# Maintained until tests are ported
|
||||
@ -464,19 +468,22 @@ class DynamicCredentialProvider(cred_provider.CredentialProvider):
|
||||
def get_project_alt_reader_creds(self):
|
||||
return self.get_credentials(['alt_reader'], scope='project')
|
||||
|
||||
def get_creds_by_roles(self, roles, force_new=False):
|
||||
def get_creds_by_roles(self, roles, force_new=False, scope=None):
|
||||
roles = list(set(roles))
|
||||
# The roles list as a str will become the index as the dict key for
|
||||
# the created credentials set in the dynamic_creds dict.
|
||||
exist_creds = self._creds.get(str(roles))
|
||||
creds_name = str(roles)
|
||||
if scope:
|
||||
creds_name = "%s_%s" % (scope, str(roles))
|
||||
exist_creds = self._creds.get(creds_name)
|
||||
# If force_new flag is True 2 cred sets with the same roles are needed
|
||||
# handle this by creating a separate index for old one to store it
|
||||
# separately for cleanup
|
||||
if exist_creds and force_new:
|
||||
new_index = str(roles) + '-' + str(len(self._creds))
|
||||
new_index = creds_name + '-' + str(len(self._creds))
|
||||
self._creds[new_index] = exist_creds
|
||||
del self._creds[str(roles)]
|
||||
return self.get_credentials(roles)
|
||||
del self._creds[creds_name]
|
||||
return self.get_credentials(roles, scope=scope)
|
||||
|
||||
def _clear_isolated_router(self, router_id, router_name):
|
||||
client = self.routers_admin_client
|
||||
|
@ -400,7 +400,7 @@ class PreProvisionedCredentialProvider(cred_provider.CredentialProvider):
|
||||
# TODO(gmann): Implement alt reader hash.
|
||||
return
|
||||
|
||||
def get_creds_by_roles(self, roles, force_new=False):
|
||||
def get_creds_by_roles(self, roles, force_new=False, scope=None):
|
||||
roles = list(set(roles))
|
||||
exist_creds = self._creds.get(str(roles).encode(
|
||||
'utf-8'), None)
|
||||
|
@ -415,8 +415,18 @@ class BaseTestCase(testtools.testcase.WithAttributes,
|
||||
'alt_manager', 'os_alt', version='Pike',
|
||||
removal_version='Queens')
|
||||
elif isinstance(credentials_type, list):
|
||||
scope = 'project'
|
||||
if credentials_type[0].startswith('system'):
|
||||
scope = 'system'
|
||||
elif credentials_type[0].startswith('domain'):
|
||||
scope = 'domain'
|
||||
manager = cls.get_client_manager(roles=credentials_type[1:],
|
||||
force_new=True)
|
||||
force_new=True,
|
||||
scope=scope)
|
||||
setattr(cls, 'os_%s' % credentials_type[0], manager)
|
||||
# TODO(gmann): Setting the old style attribute too for
|
||||
# backward compatibility but at some point we should
|
||||
# remove this.
|
||||
setattr(cls, 'os_roles_%s' % credentials_type[0], manager)
|
||||
|
||||
@classmethod
|
||||
@ -658,7 +668,7 @@ class BaseTestCase(testtools.testcase.WithAttributes,
|
||||
|
||||
@classmethod
|
||||
def get_client_manager(cls, credential_type=None, roles=None,
|
||||
force_new=None):
|
||||
force_new=None, scope=None):
|
||||
"""Returns an OpenStack client manager
|
||||
|
||||
Returns an OpenStack client manager based on either credential_type
|
||||
@ -666,6 +676,7 @@ class BaseTestCase(testtools.testcase.WithAttributes,
|
||||
credential_type 'primary'
|
||||
:param credential_type: string - primary, alt or admin
|
||||
:param roles: list of roles
|
||||
:param scope: scope for the test user
|
||||
|
||||
:returns: the created client manager
|
||||
:raises skipException: if the requested credentials are not available
|
||||
@ -684,7 +695,7 @@ class BaseTestCase(testtools.testcase.WithAttributes,
|
||||
" is not able to provide credentials with the %s role "
|
||||
"assigned." % (cls.__name__, role))
|
||||
raise cls.skipException(skip_msg)
|
||||
params = dict(roles=roles)
|
||||
params = dict(roles=roles, scope=scope)
|
||||
if force_new is not None:
|
||||
params.update(force_new=force_new)
|
||||
creds = cred_provider.get_creds_by_roles(**params)
|
||||
@ -851,7 +862,13 @@ class BaseTestCase(testtools.testcase.WithAttributes,
|
||||
if isinstance(credentials_type, six.string_types):
|
||||
manager = cls.get_client_manager(credential_type=credentials_type)
|
||||
elif isinstance(credentials_type, list):
|
||||
manager = cls.get_client_manager(roles=credentials_type[1:])
|
||||
scope = 'project'
|
||||
if credentials_type[0].startswith('system'):
|
||||
scope = 'system'
|
||||
elif credentials_type[0].startswith('domain'):
|
||||
scope = 'domain'
|
||||
manager = cls.get_client_manager(roles=credentials_type[1:],
|
||||
scope=scope)
|
||||
else:
|
||||
manager = cls.get_client_manager()
|
||||
|
||||
|
@ -291,6 +291,100 @@ class TestDynamicCredentialProvider(base.TestCase):
|
||||
self.assertEqual(role_creds.tenant_id, '1234')
|
||||
self.assertEqual(role_creds.user_id, '1234')
|
||||
|
||||
@mock.patch('tempest.lib.common.rest_client.RestClient')
|
||||
def test_role_creds_with_project_scope(self, MockRestClient):
|
||||
creds = dynamic_creds.DynamicCredentialProvider(**self.fixed_params)
|
||||
self._mock_list_2_roles()
|
||||
self._mock_user_create('1234', 'fake_role_user')
|
||||
self._mock_tenant_create('1234', 'fake_role_project')
|
||||
|
||||
user_mock = mock.patch.object(self.roles_client.RolesClient,
|
||||
'create_user_role_on_project')
|
||||
user_mock.start()
|
||||
self.addCleanup(user_mock.stop)
|
||||
with mock.patch.object(self.roles_client.RolesClient,
|
||||
'create_user_role_on_project') as user_mock:
|
||||
role_creds = creds.get_creds_by_roles(
|
||||
roles=['role1', 'role2'], scope='project')
|
||||
calls = user_mock.mock_calls
|
||||
# Assert that the role creation is called with the 2 specified roles
|
||||
self.assertEqual(len(calls), 2)
|
||||
args = map(lambda x: x[1], calls)
|
||||
args = list(args)
|
||||
self.assertIn(('1234', '1234', '1234'), args)
|
||||
self.assertIn(('1234', '1234', '12345'), args)
|
||||
self.assertEqual(role_creds.username, 'fake_role_user')
|
||||
self.assertEqual(role_creds.project_name, 'fake_role_project')
|
||||
# Verify IDs
|
||||
self.assertEqual(role_creds.project_id, '1234')
|
||||
self.assertEqual(role_creds.user_id, '1234')
|
||||
|
||||
@mock.patch('tempest.lib.common.rest_client.RestClient')
|
||||
def _test_get_same_role_creds_with_project_scope(self, MockRestClient,
|
||||
scope=None):
|
||||
creds = dynamic_creds.DynamicCredentialProvider(**self.fixed_params)
|
||||
self._mock_list_2_roles()
|
||||
self._mock_user_create('1234', 'fake_role_user')
|
||||
self._mock_tenant_create('1234', 'fake_role_project')
|
||||
with mock.patch.object(self.roles_client.RolesClient,
|
||||
'create_user_role_on_project') as user_mock:
|
||||
role_creds = creds.get_creds_by_roles(
|
||||
roles=['role1', 'role2'], scope=scope)
|
||||
calls = user_mock.mock_calls
|
||||
# Assert that the role creation is called with the 2 specified roles
|
||||
self.assertEqual(len(calls), 2)
|
||||
|
||||
# Fetch the same creds again
|
||||
with mock.patch.object(self.roles_client.RolesClient,
|
||||
'create_user_role_on_project') as user_mock1:
|
||||
role_creds_new = creds.get_creds_by_roles(
|
||||
roles=['role1', 'role2'], scope=scope)
|
||||
calls = user_mock1.mock_calls
|
||||
# Assert that previously created creds are return and no call to
|
||||
# role creation.
|
||||
self.assertEqual(len(calls), 0)
|
||||
# Check if previously created creds are returned.
|
||||
self.assertEqual(role_creds, role_creds_new)
|
||||
|
||||
def test_get_same_role_creds_with_project_scope(self):
|
||||
self._test_get_same_role_creds_with_project_scope(scope='project')
|
||||
|
||||
def test_get_same_role_creds_with_default_scope(self):
|
||||
self._test_get_same_role_creds_with_project_scope()
|
||||
|
||||
@mock.patch('tempest.lib.common.rest_client.RestClient')
|
||||
def _test_get_different_role_creds_with_project_scope(
|
||||
self, MockRestClient, scope=None):
|
||||
creds = dynamic_creds.DynamicCredentialProvider(**self.fixed_params)
|
||||
self._mock_list_2_roles()
|
||||
self._mock_user_create('1234', 'fake_role_user')
|
||||
self._mock_tenant_create('1234', 'fake_role_project')
|
||||
with mock.patch.object(self.roles_client.RolesClient,
|
||||
'create_user_role_on_project') as user_mock:
|
||||
role_creds = creds.get_creds_by_roles(
|
||||
roles=['role1', 'role2'], scope=scope)
|
||||
calls = user_mock.mock_calls
|
||||
# Assert that the role creation is called with the 2 specified roles
|
||||
self.assertEqual(len(calls), 2)
|
||||
# Fetch the creds with one role different
|
||||
with mock.patch.object(self.roles_client.RolesClient,
|
||||
'create_user_role_on_project') as user_mock1:
|
||||
role_creds_new = creds.get_creds_by_roles(
|
||||
roles=['role1'], scope=scope)
|
||||
calls = user_mock1.mock_calls
|
||||
# Because one role is different, assert that the role creation
|
||||
# is called with the 1 specified roles
|
||||
self.assertEqual(len(calls), 1)
|
||||
# Check new creds is created for new roles.
|
||||
self.assertNotEqual(role_creds, role_creds_new)
|
||||
|
||||
def test_get_different_role_creds_with_project_scope(self):
|
||||
self._test_get_different_role_creds_with_project_scope(
|
||||
scope='project')
|
||||
|
||||
def test_get_different_role_creds_with_default_scope(self):
|
||||
self._test_get_different_role_creds_with_project_scope()
|
||||
|
||||
@mock.patch('tempest.lib.common.rest_client.RestClient')
|
||||
def test_all_cred_cleanup(self, MockRestClient):
|
||||
creds = dynamic_creds.DynamicCredentialProvider(**self.fixed_params)
|
||||
@ -707,6 +801,232 @@ class TestDynamicCredentialProviderV3(TestDynamicCredentialProvider):
|
||||
(200, {'project': {'id': id, 'name': name}}))))
|
||||
return project_fix
|
||||
|
||||
@mock.patch('tempest.lib.common.rest_client.RestClient')
|
||||
def test_role_creds_with_system_scope(self, MockRestClient):
|
||||
creds = dynamic_creds.DynamicCredentialProvider(**self.fixed_params)
|
||||
self._mock_list_2_roles()
|
||||
self._mock_user_create('1234', 'fake_role_user')
|
||||
|
||||
with mock.patch.object(self.roles_client.RolesClient,
|
||||
'create_user_role_on_system') as user_mock:
|
||||
role_creds = creds.get_creds_by_roles(
|
||||
roles=['role1', 'role2'], scope='system')
|
||||
calls = user_mock.mock_calls
|
||||
# Assert that the role creation is called with the 2 specified roles
|
||||
self.assertEqual(len(calls), 2)
|
||||
args = map(lambda x: x[1], calls)
|
||||
args = list(args)
|
||||
self.assertIn(('1234', '1234'), args)
|
||||
self.assertIn(('1234', '12345'), args)
|
||||
self.assertEqual(role_creds.username, 'fake_role_user')
|
||||
self.assertEqual(role_creds.user_id, '1234')
|
||||
# Verify system scope
|
||||
self.assertEqual(role_creds.system, 'all')
|
||||
# Verify domain is default
|
||||
self.assertEqual(role_creds.domain_id, 'default')
|
||||
self.assertEqual(role_creds.domain_name, 'Default')
|
||||
|
||||
@mock.patch('tempest.lib.common.rest_client.RestClient')
|
||||
def test_get_same_role_creds_with_system_scope(self, MockRestClient):
|
||||
creds = dynamic_creds.DynamicCredentialProvider(**self.fixed_params)
|
||||
self._mock_list_2_roles()
|
||||
self._mock_user_create('1234', 'fake_role_user')
|
||||
with mock.patch.object(self.roles_client.RolesClient,
|
||||
'create_user_role_on_system') as user_mock:
|
||||
role_creds = creds.get_creds_by_roles(
|
||||
roles=['role1', 'role2'], scope='system')
|
||||
calls = user_mock.mock_calls
|
||||
# Assert that the role creation is called with the 2 specified roles
|
||||
self.assertEqual(len(calls), 2)
|
||||
|
||||
# Fetch the same creds again
|
||||
with mock.patch.object(self.roles_client.RolesClient,
|
||||
'create_user_role_on_system') as user_mock1:
|
||||
role_creds_new = creds.get_creds_by_roles(
|
||||
roles=['role1', 'role2'], scope='system')
|
||||
calls = user_mock1.mock_calls
|
||||
# Assert that previously created creds are return and no call to
|
||||
# role creation.
|
||||
self.assertEqual(len(calls), 0)
|
||||
# Verify system scope
|
||||
self.assertEqual(role_creds_new.system, 'all')
|
||||
# Check if previously created creds are returned.
|
||||
self.assertEqual(role_creds, role_creds_new)
|
||||
|
||||
@mock.patch('tempest.lib.common.rest_client.RestClient')
|
||||
def test_get_different_role_creds_with_system_scope(self, MockRestClient):
|
||||
creds = dynamic_creds.DynamicCredentialProvider(**self.fixed_params)
|
||||
self._mock_list_2_roles()
|
||||
self._mock_user_create('1234', 'fake_role_user')
|
||||
|
||||
with mock.patch.object(self.roles_client.RolesClient,
|
||||
'create_user_role_on_system') as user_mock:
|
||||
role_creds = creds.get_creds_by_roles(
|
||||
roles=['role1', 'role2'], scope='system')
|
||||
calls = user_mock.mock_calls
|
||||
# Assert that the role creation is called with the 2 specified roles
|
||||
self.assertEqual(len(calls), 2)
|
||||
# Verify system scope
|
||||
self.assertEqual(role_creds.system, 'all')
|
||||
# Fetch the creds with one role different
|
||||
with mock.patch.object(self.roles_client.RolesClient,
|
||||
'create_user_role_on_system') as user_mock1:
|
||||
role_creds_new = creds.get_creds_by_roles(
|
||||
roles=['role1'], scope='system')
|
||||
calls = user_mock1.mock_calls
|
||||
# Because one role is different, assert that the role creation
|
||||
# is called with the 1 specified roles
|
||||
self.assertEqual(len(calls), 1)
|
||||
# Verify Scope
|
||||
self.assertEqual(role_creds_new.system, 'all')
|
||||
# Check new creds is created for new roles.
|
||||
self.assertNotEqual(role_creds, role_creds_new)
|
||||
|
||||
@mock.patch('tempest.lib.common.rest_client.RestClient')
|
||||
def test_role_creds_with_domain_scope(self, MockRestClient):
|
||||
creds = dynamic_creds.DynamicCredentialProvider(**self.fixed_params)
|
||||
self._mock_list_2_roles()
|
||||
self._mock_user_create('1234', 'fake_role_user')
|
||||
|
||||
domain = {
|
||||
"id": '12',
|
||||
"enabled": True,
|
||||
"name": "TestDomain"
|
||||
}
|
||||
|
||||
self.useFixture(fixtures.MockPatch(
|
||||
'tempest.lib.common.cred_client.V3CredsClient.create_domain',
|
||||
return_value=domain))
|
||||
|
||||
with mock.patch.object(self.roles_client.RolesClient,
|
||||
'create_user_role_on_domain') as user_mock:
|
||||
role_creds = creds.get_creds_by_roles(
|
||||
roles=['role1', 'role2'], scope='domain')
|
||||
calls = user_mock.mock_calls
|
||||
# Assert that the role creation is called with the 2 specified roles
|
||||
self.assertEqual(len(calls), 2)
|
||||
args = map(lambda x: x[1], calls)
|
||||
args = list(args)
|
||||
self.assertIn((domain['id'], '1234', '1234'), args)
|
||||
self.assertIn((domain['id'], '1234', '12345'), args)
|
||||
self.assertEqual(role_creds.username, 'fake_role_user')
|
||||
self.assertEqual(role_creds.user_id, '1234')
|
||||
# Verify creds are under new created domain
|
||||
self.assertEqual(role_creds.domain_id, domain['id'])
|
||||
self.assertEqual(role_creds.domain_name, domain['name'])
|
||||
# Verify that Scope is None
|
||||
self.assertIsNone(role_creds.system)
|
||||
|
||||
@mock.patch('tempest.lib.common.rest_client.RestClient')
|
||||
def test_get_same_role_creds_with_domain_scope(self, MockRestClient):
|
||||
creds = dynamic_creds.DynamicCredentialProvider(**self.fixed_params)
|
||||
self._mock_list_2_roles()
|
||||
self._mock_user_create('1234', 'fake_role_user')
|
||||
|
||||
domain = {
|
||||
"id": '12',
|
||||
"enabled": True,
|
||||
"name": "TestDomain"
|
||||
}
|
||||
|
||||
self.useFixture(fixtures.MockPatch(
|
||||
'tempest.lib.common.cred_client.V3CredsClient.create_domain',
|
||||
return_value=domain))
|
||||
|
||||
with mock.patch.object(self.roles_client.RolesClient,
|
||||
'create_user_role_on_domain') as user_mock:
|
||||
role_creds = creds.get_creds_by_roles(
|
||||
roles=['role1', 'role2'], scope='domain')
|
||||
calls = user_mock.mock_calls
|
||||
# Assert that the role creation is called with the 2 specified roles
|
||||
self.assertEqual(len(calls), 2)
|
||||
self.assertEqual(role_creds.user_id, '1234')
|
||||
# Verify Scope
|
||||
self.assertIsNone(role_creds.system)
|
||||
# Fetch the same creds again
|
||||
with mock.patch.object(self.roles_client.RolesClient,
|
||||
'create_user_role_on_domain') as user_mock1:
|
||||
role_creds_new = creds.get_creds_by_roles(
|
||||
roles=['role1', 'role2'], scope='domain')
|
||||
calls = user_mock1.mock_calls
|
||||
# Assert that previously created creds are return and no call to
|
||||
# role creation.
|
||||
self.assertEqual(len(calls), 0)
|
||||
# Verify Scope
|
||||
self.assertIsNone(role_creds_new.system)
|
||||
# Check if previously created creds are returned.
|
||||
self.assertEqual(role_creds, role_creds_new)
|
||||
|
||||
@mock.patch('tempest.lib.common.rest_client.RestClient')
|
||||
def test_get_different_role_creds_with_domain_scope(self, MockRestClient):
|
||||
creds = dynamic_creds.DynamicCredentialProvider(**self.fixed_params)
|
||||
self._mock_list_2_roles()
|
||||
self._mock_user_create('1234', 'fake_role_user')
|
||||
|
||||
domain = {
|
||||
"id": '12',
|
||||
"enabled": True,
|
||||
"name": "TestDomain"
|
||||
}
|
||||
|
||||
self.useFixture(fixtures.MockPatch(
|
||||
'tempest.lib.common.cred_client.V3CredsClient.create_domain',
|
||||
return_value=domain))
|
||||
|
||||
with mock.patch.object(self.roles_client.RolesClient,
|
||||
'create_user_role_on_domain') as user_mock:
|
||||
role_creds = creds.get_creds_by_roles(
|
||||
roles=['role1', 'role2'], scope='domain')
|
||||
calls = user_mock.mock_calls
|
||||
# Assert that the role creation is called with the 2 specified roles
|
||||
self.assertEqual(len(calls), 2)
|
||||
self.assertEqual(role_creds.user_id, '1234')
|
||||
# Verify Scope
|
||||
self.assertIsNone(role_creds.system)
|
||||
# Fetch the same creds again
|
||||
with mock.patch.object(self.roles_client.RolesClient,
|
||||
'create_user_role_on_domain') as user_mock1:
|
||||
role_creds_new = creds.get_creds_by_roles(
|
||||
roles=['role1'], scope='domain')
|
||||
calls = user_mock1.mock_calls
|
||||
# Because one role is different, assert that the role creation
|
||||
# is called with the 1 specified roles
|
||||
self.assertEqual(len(calls), 1)
|
||||
# Verify Scope
|
||||
self.assertIsNone(role_creds_new.system)
|
||||
# Check new creds is created for new roles.
|
||||
self.assertNotEqual(role_creds, role_creds_new)
|
||||
|
||||
@mock.patch('tempest.lib.common.rest_client.RestClient')
|
||||
def test_get_role_creds_with_different_scope(self, MockRestClient):
|
||||
creds = dynamic_creds.DynamicCredentialProvider(**self.fixed_params)
|
||||
self._mock_list_2_roles()
|
||||
self._mock_user_create('1234', 'fake_role_user')
|
||||
self._mock_tenant_create('1234', 'fake_role_project')
|
||||
with mock.patch.object(self.roles_client.RolesClient,
|
||||
'create_user_role_on_system') as user_mock:
|
||||
role_creds = creds.get_creds_by_roles(
|
||||
roles=['role1', 'role2'], scope='system')
|
||||
calls = user_mock.mock_calls
|
||||
# Assert that the role creation is called with the 2 specified roles
|
||||
self.assertEqual(len(calls), 2)
|
||||
# Verify Scope
|
||||
self.assertEqual(role_creds.system, 'all')
|
||||
|
||||
# Fetch the same role creds but with different scope
|
||||
with mock.patch.object(self.roles_client.RolesClient,
|
||||
'create_user_role_on_project') as user_mock1:
|
||||
role_creds_new = creds.get_creds_by_roles(
|
||||
roles=['role1', 'role2'], scope='project')
|
||||
calls = user_mock1.mock_calls
|
||||
# Because scope is different, assert that the role creation
|
||||
# is called with the 2 specified roles
|
||||
self.assertEqual(len(calls), 2)
|
||||
# Verify Scope
|
||||
self.assertIsNone(role_creds_new.system)
|
||||
# Check that created creds are different
|
||||
self.assertNotEqual(role_creds, role_creds_new)
|
||||
|
||||
@mock.patch('tempest.lib.common.rest_client.RestClient')
|
||||
def test_member_role_creation_with_duplicate(self, rest_client_mock):
|
||||
creds = dynamic_creds.DynamicCredentialProvider(**self.fixed_params)
|
||||
|
@ -109,7 +109,7 @@ class TestBaseTestCase(base.TestCase):
|
||||
|
||||
test.BaseTestCase.get_tenant_network(credentials_type=creds)
|
||||
|
||||
mock_gcm.assert_called_once_with(roles=['role1'])
|
||||
mock_gcm.assert_called_once_with(roles=['role1'], scope='project')
|
||||
mock_gprov.assert_called_once_with()
|
||||
mock_gtn.assert_called_once_with(mock_prov, net_client,
|
||||
self.fixed_network_name)
|
||||
|
@ -453,6 +453,130 @@ class TestTempestBaseTestClass(base.TestCase):
|
||||
expected_creds[1][1:],
|
||||
mock_get_client_manager.mock_calls[1][2]['roles'])
|
||||
|
||||
def test_setup_credentials_with_role_and_system_scope(self):
|
||||
expected_creds = [['system_my_role', 'role1', 'role2']]
|
||||
|
||||
class SystemRoleCredentials(self.parent_test):
|
||||
credentials = expected_creds
|
||||
|
||||
expected_clients = 'clients'
|
||||
with mock.patch.object(
|
||||
SystemRoleCredentials,
|
||||
'get_client_manager') as mock_get_client_manager:
|
||||
mock_get_client_manager.return_value = expected_clients
|
||||
sys_creds = SystemRoleCredentials()
|
||||
sys_creds.setup_credentials()
|
||||
self.assertTrue(hasattr(sys_creds, 'os_system_my_role'))
|
||||
self.assertEqual(expected_clients, sys_creds.os_system_my_role)
|
||||
self.assertTrue(hasattr(sys_creds, 'os_roles_system_my_role'))
|
||||
self.assertEqual(expected_clients, sys_creds.os_roles_system_my_role)
|
||||
self.assertEqual(1, mock_get_client_manager.call_count)
|
||||
self.assertEqual(
|
||||
expected_creds[0][1:],
|
||||
mock_get_client_manager.mock_calls[0][2]['roles'])
|
||||
self.assertEqual(
|
||||
'system',
|
||||
mock_get_client_manager.mock_calls[0][2]['scope'])
|
||||
|
||||
def test_setup_credentials_with_multiple_role_and_system_scope(self):
|
||||
expected_creds = [['system_my_role', 'role1', 'role2'],
|
||||
['system_my_role2', 'role1', 'role2'],
|
||||
['system_my_role3', 'role3']]
|
||||
|
||||
class SystemRoleCredentials(self.parent_test):
|
||||
credentials = expected_creds
|
||||
|
||||
expected_clients = 'clients'
|
||||
with mock.patch.object(
|
||||
SystemRoleCredentials,
|
||||
'get_client_manager') as mock_get_client_manager:
|
||||
mock_get_client_manager.return_value = expected_clients
|
||||
sys_creds = SystemRoleCredentials()
|
||||
sys_creds.setup_credentials()
|
||||
self.assertTrue(hasattr(sys_creds, 'os_system_my_role'))
|
||||
self.assertEqual(expected_clients, sys_creds.os_system_my_role)
|
||||
self.assertTrue(hasattr(sys_creds, 'os_roles_system_my_role'))
|
||||
self.assertEqual(expected_clients, sys_creds.os_roles_system_my_role)
|
||||
self.assertTrue(hasattr(sys_creds, 'os_system_my_role2'))
|
||||
self.assertEqual(expected_clients, sys_creds.os_system_my_role2)
|
||||
self.assertTrue(hasattr(sys_creds, 'os_roles_system_my_role2'))
|
||||
self.assertEqual(expected_clients, sys_creds.os_roles_system_my_role2)
|
||||
self.assertTrue(hasattr(sys_creds, 'os_system_my_role3'))
|
||||
self.assertEqual(expected_clients, sys_creds.os_system_my_role3)
|
||||
self.assertTrue(hasattr(sys_creds, 'os_roles_system_my_role3'))
|
||||
self.assertEqual(expected_clients, sys_creds.os_roles_system_my_role3)
|
||||
self.assertEqual(3, mock_get_client_manager.call_count)
|
||||
self.assertEqual(
|
||||
expected_creds[0][1:],
|
||||
mock_get_client_manager.mock_calls[0][2]['roles'])
|
||||
self.assertEqual(
|
||||
'system', mock_get_client_manager.mock_calls[0][2]['scope'])
|
||||
self.assertEqual(
|
||||
expected_creds[1][1:],
|
||||
mock_get_client_manager.mock_calls[1][2]['roles'])
|
||||
self.assertEqual(
|
||||
'system', mock_get_client_manager.mock_calls[1][2]['scope'])
|
||||
self.assertEqual(
|
||||
expected_creds[2][1:],
|
||||
mock_get_client_manager.mock_calls[2][2]['roles'])
|
||||
self.assertEqual(
|
||||
'system', mock_get_client_manager.mock_calls[2][2]['scope'])
|
||||
|
||||
def test_setup_credentials_with_role_and_multiple_scope(self):
|
||||
expected_creds = [['my_role', 'role1', 'role2'],
|
||||
['project_my_role', 'role1', 'role2'],
|
||||
['domain_my_role', 'role1', 'role2'],
|
||||
['system_my_role', 'role1', 'role2']]
|
||||
|
||||
class SystemRoleCredentials(self.parent_test):
|
||||
credentials = expected_creds
|
||||
|
||||
expected_clients = 'clients'
|
||||
with mock.patch.object(
|
||||
SystemRoleCredentials,
|
||||
'get_client_manager') as mock_get_client_manager:
|
||||
mock_get_client_manager.return_value = expected_clients
|
||||
sys_creds = SystemRoleCredentials()
|
||||
sys_creds.setup_credentials()
|
||||
self.assertTrue(hasattr(sys_creds, 'os_my_role'))
|
||||
self.assertEqual(expected_clients, sys_creds.os_my_role)
|
||||
self.assertTrue(hasattr(sys_creds, 'os_roles_my_role'))
|
||||
self.assertEqual(expected_clients, sys_creds.os_roles_my_role)
|
||||
self.assertTrue(hasattr(sys_creds, 'os_project_my_role'))
|
||||
self.assertEqual(expected_clients, sys_creds.os_project_my_role)
|
||||
self.assertTrue(hasattr(sys_creds, 'os_roles_project_my_role'))
|
||||
self.assertEqual(expected_clients, sys_creds.os_roles_project_my_role)
|
||||
self.assertTrue(hasattr(sys_creds, 'os_domain_my_role'))
|
||||
self.assertEqual(expected_clients, sys_creds.os_domain_my_role)
|
||||
self.assertTrue(hasattr(sys_creds, 'os_roles_domain_my_role'))
|
||||
self.assertEqual(expected_clients, sys_creds.os_roles_domain_my_role)
|
||||
self.assertTrue(hasattr(sys_creds, 'os_system_my_role'))
|
||||
self.assertEqual(expected_clients, sys_creds.os_system_my_role)
|
||||
self.assertTrue(hasattr(sys_creds, 'os_roles_system_my_role'))
|
||||
self.assertEqual(expected_clients, sys_creds.os_roles_system_my_role)
|
||||
|
||||
self.assertEqual(4, mock_get_client_manager.call_count)
|
||||
self.assertEqual(
|
||||
expected_creds[0][1:],
|
||||
mock_get_client_manager.mock_calls[0][2]['roles'])
|
||||
self.assertEqual(
|
||||
'project', mock_get_client_manager.mock_calls[0][2]['scope'])
|
||||
self.assertEqual(
|
||||
expected_creds[1][1:],
|
||||
mock_get_client_manager.mock_calls[1][2]['roles'])
|
||||
self.assertEqual(
|
||||
'project', mock_get_client_manager.mock_calls[1][2]['scope'])
|
||||
self.assertEqual(
|
||||
expected_creds[2][1:],
|
||||
mock_get_client_manager.mock_calls[2][2]['roles'])
|
||||
self.assertEqual(
|
||||
'domain', mock_get_client_manager.mock_calls[2][2]['scope'])
|
||||
self.assertEqual(
|
||||
expected_creds[3][1:],
|
||||
mock_get_client_manager.mock_calls[3][2]['roles'])
|
||||
self.assertEqual(
|
||||
'system', mock_get_client_manager.mock_calls[3][2]['scope'])
|
||||
|
||||
def test_setup_class_overwritten(self):
|
||||
|
||||
class OverridesSetup(self.parent_test):
|
||||
|
Loading…
Reference in New Issue
Block a user