Merge "Support scope in dynamic cred for specific roles"
This commit is contained in:
commit
22690668e1
|
@ -0,0 +1,36 @@
|
||||||
|
---
|
||||||
|
features:
|
||||||
|
- |
|
||||||
|
Dynamic credentials now support the scope type for specific roles
|
||||||
|
too along with ``admin``, ``member``, ``reader`` role.
|
||||||
|
Test can specify the scope in the prefix of ``cls.credentials`` name.
|
||||||
|
If ``system`` is prefix in ``cls.credentials`` name then creds will
|
||||||
|
be created with scope as ``system``. If ``domain`` is prefix in
|
||||||
|
``cls.credentials`` name then creds will be created with scope as
|
||||||
|
``domain`` otherwise default ``project`` scope will be used.
|
||||||
|
For Example::
|
||||||
|
|
||||||
|
credentials = [['my_role', 'role1'], # this will be old style and project scoped
|
||||||
|
['project_my_role', 'role1'], # this will be project scoped
|
||||||
|
['domain_my_role', 'role1'], # this will be domain scoped
|
||||||
|
['system_my_role', 'role1']] # this will be system scoped
|
||||||
|
|
||||||
|
And below is how test can access the credential manager of respective
|
||||||
|
credentials type::
|
||||||
|
|
||||||
|
cls.os_my_role.any_client
|
||||||
|
cls.os_project_my_role.any_client
|
||||||
|
cls.os_domain_my_role.any_client
|
||||||
|
cls.os_system_my_role.any_client
|
||||||
|
|
||||||
|
|
||||||
|
For backward compatibility, we set the credentials manager class attribute
|
||||||
|
in old style form too which is prefix with ``os_roles_*``, example
|
||||||
|
``cls.os_roles_my_role`` but we recommend to use the new style attribute
|
||||||
|
as shown above.
|
||||||
|
issues:
|
||||||
|
- |
|
||||||
|
Scope support for specific role is not yet added for pre-provisioned credentials.
|
||||||
|
fixes:
|
||||||
|
- |
|
||||||
|
Fixes the `bug# 1917168 <https://bugs.launchpad.net/tempest/+bug/1917168>`_
|
|
@ -118,7 +118,7 @@ class CredentialProvider(object, metaclass=abc.ABCMeta):
|
||||||
return
|
return
|
||||||
|
|
||||||
@abc.abstractmethod
|
@abc.abstractmethod
|
||||||
def get_creds_by_roles(self, roles, force_new=False):
|
def get_creds_by_roles(self, roles, force_new=False, scope=None):
|
||||||
return
|
return
|
||||||
|
|
||||||
@abc.abstractmethod
|
@abc.abstractmethod
|
||||||
|
|
|
@ -376,21 +376,25 @@ class DynamicCredentialProvider(cred_provider.CredentialProvider):
|
||||||
def get_credentials(self, credential_type, scope=None):
|
def get_credentials(self, credential_type, scope=None):
|
||||||
if not scope and self._creds.get(str(credential_type)):
|
if not scope and self._creds.get(str(credential_type)):
|
||||||
credentials = self._creds[str(credential_type)]
|
credentials = self._creds[str(credential_type)]
|
||||||
elif scope and self._creds.get("%s_%s" % (scope, credential_type[0])):
|
elif scope and (
|
||||||
credentials = self._creds["%s_%s" % (scope, credential_type[0])]
|
self._creds.get("%s_%s" % (scope, str(credential_type)))):
|
||||||
|
credentials = self._creds["%s_%s" % (scope, str(credential_type))]
|
||||||
else:
|
else:
|
||||||
|
LOG.debug("Creating new dynamic creds for scope: %s and "
|
||||||
|
"credential_type: %s", scope, credential_type)
|
||||||
if scope:
|
if scope:
|
||||||
if credential_type in [['admin'], ['alt_admin']]:
|
if credential_type in [['admin'], ['alt_admin']]:
|
||||||
credentials = self._create_creds(
|
credentials = self._create_creds(
|
||||||
admin=True, scope=scope)
|
admin=True, scope=scope)
|
||||||
else:
|
elif credential_type in [['alt_member'], ['alt_reader']]:
|
||||||
cred_type = credential_type
|
cred_type = credential_type[0][4:]
|
||||||
if credential_type in [['alt_member'], ['alt_reader']]:
|
|
||||||
cred_type = credential_type[0][4:]
|
|
||||||
if isinstance(cred_type, str):
|
if isinstance(cred_type, str):
|
||||||
cred_type = [cred_type]
|
cred_type = [cred_type]
|
||||||
credentials = self._create_creds(
|
credentials = self._create_creds(
|
||||||
roles=cred_type, scope=scope)
|
roles=cred_type, scope=scope)
|
||||||
|
else:
|
||||||
|
credentials = self._create_creds(
|
||||||
|
roles=credential_type, scope=scope)
|
||||||
elif credential_type in ['primary', 'alt', 'admin']:
|
elif credential_type in ['primary', 'alt', 'admin']:
|
||||||
is_admin = (credential_type == 'admin')
|
is_admin = (credential_type == 'admin')
|
||||||
credentials = self._create_creds(admin=is_admin)
|
credentials = self._create_creds(admin=is_admin)
|
||||||
|
@ -398,7 +402,7 @@ class DynamicCredentialProvider(cred_provider.CredentialProvider):
|
||||||
credentials = self._create_creds(roles=credential_type)
|
credentials = self._create_creds(roles=credential_type)
|
||||||
if scope:
|
if scope:
|
||||||
self._creds["%s_%s" %
|
self._creds["%s_%s" %
|
||||||
(scope, credential_type[0])] = credentials
|
(scope, str(credential_type))] = credentials
|
||||||
else:
|
else:
|
||||||
self._creds[str(credential_type)] = credentials
|
self._creds[str(credential_type)] = credentials
|
||||||
# Maintained until tests are ported
|
# Maintained until tests are ported
|
||||||
|
@ -464,19 +468,22 @@ class DynamicCredentialProvider(cred_provider.CredentialProvider):
|
||||||
def get_project_alt_reader_creds(self):
|
def get_project_alt_reader_creds(self):
|
||||||
return self.get_credentials(['alt_reader'], scope='project')
|
return self.get_credentials(['alt_reader'], scope='project')
|
||||||
|
|
||||||
def get_creds_by_roles(self, roles, force_new=False):
|
def get_creds_by_roles(self, roles, force_new=False, scope=None):
|
||||||
roles = list(set(roles))
|
roles = list(set(roles))
|
||||||
# The roles list as a str will become the index as the dict key for
|
# The roles list as a str will become the index as the dict key for
|
||||||
# the created credentials set in the dynamic_creds dict.
|
# the created credentials set in the dynamic_creds dict.
|
||||||
exist_creds = self._creds.get(str(roles))
|
creds_name = str(roles)
|
||||||
|
if scope:
|
||||||
|
creds_name = "%s_%s" % (scope, str(roles))
|
||||||
|
exist_creds = self._creds.get(creds_name)
|
||||||
# If force_new flag is True 2 cred sets with the same roles are needed
|
# If force_new flag is True 2 cred sets with the same roles are needed
|
||||||
# handle this by creating a separate index for old one to store it
|
# handle this by creating a separate index for old one to store it
|
||||||
# separately for cleanup
|
# separately for cleanup
|
||||||
if exist_creds and force_new:
|
if exist_creds and force_new:
|
||||||
new_index = str(roles) + '-' + str(len(self._creds))
|
new_index = creds_name + '-' + str(len(self._creds))
|
||||||
self._creds[new_index] = exist_creds
|
self._creds[new_index] = exist_creds
|
||||||
del self._creds[str(roles)]
|
del self._creds[creds_name]
|
||||||
return self.get_credentials(roles)
|
return self.get_credentials(roles, scope=scope)
|
||||||
|
|
||||||
def _clear_isolated_router(self, router_id, router_name):
|
def _clear_isolated_router(self, router_id, router_name):
|
||||||
client = self.routers_admin_client
|
client = self.routers_admin_client
|
||||||
|
|
|
@ -400,7 +400,7 @@ class PreProvisionedCredentialProvider(cred_provider.CredentialProvider):
|
||||||
# TODO(gmann): Implement alt reader hash.
|
# TODO(gmann): Implement alt reader hash.
|
||||||
return
|
return
|
||||||
|
|
||||||
def get_creds_by_roles(self, roles, force_new=False):
|
def get_creds_by_roles(self, roles, force_new=False, scope=None):
|
||||||
roles = list(set(roles))
|
roles = list(set(roles))
|
||||||
exist_creds = self._creds.get(str(roles).encode(
|
exist_creds = self._creds.get(str(roles).encode(
|
||||||
'utf-8'), None)
|
'utf-8'), None)
|
||||||
|
|
|
@ -415,8 +415,18 @@ class BaseTestCase(testtools.testcase.WithAttributes,
|
||||||
'alt_manager', 'os_alt', version='Pike',
|
'alt_manager', 'os_alt', version='Pike',
|
||||||
removal_version='Queens')
|
removal_version='Queens')
|
||||||
elif isinstance(credentials_type, list):
|
elif isinstance(credentials_type, list):
|
||||||
|
scope = 'project'
|
||||||
|
if credentials_type[0].startswith('system'):
|
||||||
|
scope = 'system'
|
||||||
|
elif credentials_type[0].startswith('domain'):
|
||||||
|
scope = 'domain'
|
||||||
manager = cls.get_client_manager(roles=credentials_type[1:],
|
manager = cls.get_client_manager(roles=credentials_type[1:],
|
||||||
force_new=True)
|
force_new=True,
|
||||||
|
scope=scope)
|
||||||
|
setattr(cls, 'os_%s' % credentials_type[0], manager)
|
||||||
|
# TODO(gmann): Setting the old style attribute too for
|
||||||
|
# backward compatibility but at some point we should
|
||||||
|
# remove this.
|
||||||
setattr(cls, 'os_roles_%s' % credentials_type[0], manager)
|
setattr(cls, 'os_roles_%s' % credentials_type[0], manager)
|
||||||
|
|
||||||
@classmethod
|
@classmethod
|
||||||
|
@ -658,7 +668,7 @@ class BaseTestCase(testtools.testcase.WithAttributes,
|
||||||
|
|
||||||
@classmethod
|
@classmethod
|
||||||
def get_client_manager(cls, credential_type=None, roles=None,
|
def get_client_manager(cls, credential_type=None, roles=None,
|
||||||
force_new=None):
|
force_new=None, scope=None):
|
||||||
"""Returns an OpenStack client manager
|
"""Returns an OpenStack client manager
|
||||||
|
|
||||||
Returns an OpenStack client manager based on either credential_type
|
Returns an OpenStack client manager based on either credential_type
|
||||||
|
@ -666,6 +676,7 @@ class BaseTestCase(testtools.testcase.WithAttributes,
|
||||||
credential_type 'primary'
|
credential_type 'primary'
|
||||||
:param credential_type: string - primary, alt or admin
|
:param credential_type: string - primary, alt or admin
|
||||||
:param roles: list of roles
|
:param roles: list of roles
|
||||||
|
:param scope: scope for the test user
|
||||||
|
|
||||||
:returns: the created client manager
|
:returns: the created client manager
|
||||||
:raises skipException: if the requested credentials are not available
|
:raises skipException: if the requested credentials are not available
|
||||||
|
@ -684,7 +695,7 @@ class BaseTestCase(testtools.testcase.WithAttributes,
|
||||||
" is not able to provide credentials with the %s role "
|
" is not able to provide credentials with the %s role "
|
||||||
"assigned." % (cls.__name__, role))
|
"assigned." % (cls.__name__, role))
|
||||||
raise cls.skipException(skip_msg)
|
raise cls.skipException(skip_msg)
|
||||||
params = dict(roles=roles)
|
params = dict(roles=roles, scope=scope)
|
||||||
if force_new is not None:
|
if force_new is not None:
|
||||||
params.update(force_new=force_new)
|
params.update(force_new=force_new)
|
||||||
creds = cred_provider.get_creds_by_roles(**params)
|
creds = cred_provider.get_creds_by_roles(**params)
|
||||||
|
@ -851,7 +862,13 @@ class BaseTestCase(testtools.testcase.WithAttributes,
|
||||||
if isinstance(credentials_type, six.string_types):
|
if isinstance(credentials_type, six.string_types):
|
||||||
manager = cls.get_client_manager(credential_type=credentials_type)
|
manager = cls.get_client_manager(credential_type=credentials_type)
|
||||||
elif isinstance(credentials_type, list):
|
elif isinstance(credentials_type, list):
|
||||||
manager = cls.get_client_manager(roles=credentials_type[1:])
|
scope = 'project'
|
||||||
|
if credentials_type[0].startswith('system'):
|
||||||
|
scope = 'system'
|
||||||
|
elif credentials_type[0].startswith('domain'):
|
||||||
|
scope = 'domain'
|
||||||
|
manager = cls.get_client_manager(roles=credentials_type[1:],
|
||||||
|
scope=scope)
|
||||||
else:
|
else:
|
||||||
manager = cls.get_client_manager()
|
manager = cls.get_client_manager()
|
||||||
|
|
||||||
|
|
|
@ -291,6 +291,100 @@ class TestDynamicCredentialProvider(base.TestCase):
|
||||||
self.assertEqual(role_creds.tenant_id, '1234')
|
self.assertEqual(role_creds.tenant_id, '1234')
|
||||||
self.assertEqual(role_creds.user_id, '1234')
|
self.assertEqual(role_creds.user_id, '1234')
|
||||||
|
|
||||||
|
@mock.patch('tempest.lib.common.rest_client.RestClient')
|
||||||
|
def test_role_creds_with_project_scope(self, MockRestClient):
|
||||||
|
creds = dynamic_creds.DynamicCredentialProvider(**self.fixed_params)
|
||||||
|
self._mock_list_2_roles()
|
||||||
|
self._mock_user_create('1234', 'fake_role_user')
|
||||||
|
self._mock_tenant_create('1234', 'fake_role_project')
|
||||||
|
|
||||||
|
user_mock = mock.patch.object(self.roles_client.RolesClient,
|
||||||
|
'create_user_role_on_project')
|
||||||
|
user_mock.start()
|
||||||
|
self.addCleanup(user_mock.stop)
|
||||||
|
with mock.patch.object(self.roles_client.RolesClient,
|
||||||
|
'create_user_role_on_project') as user_mock:
|
||||||
|
role_creds = creds.get_creds_by_roles(
|
||||||
|
roles=['role1', 'role2'], scope='project')
|
||||||
|
calls = user_mock.mock_calls
|
||||||
|
# Assert that the role creation is called with the 2 specified roles
|
||||||
|
self.assertEqual(len(calls), 2)
|
||||||
|
args = map(lambda x: x[1], calls)
|
||||||
|
args = list(args)
|
||||||
|
self.assertIn(('1234', '1234', '1234'), args)
|
||||||
|
self.assertIn(('1234', '1234', '12345'), args)
|
||||||
|
self.assertEqual(role_creds.username, 'fake_role_user')
|
||||||
|
self.assertEqual(role_creds.project_name, 'fake_role_project')
|
||||||
|
# Verify IDs
|
||||||
|
self.assertEqual(role_creds.project_id, '1234')
|
||||||
|
self.assertEqual(role_creds.user_id, '1234')
|
||||||
|
|
||||||
|
@mock.patch('tempest.lib.common.rest_client.RestClient')
|
||||||
|
def _test_get_same_role_creds_with_project_scope(self, MockRestClient,
|
||||||
|
scope=None):
|
||||||
|
creds = dynamic_creds.DynamicCredentialProvider(**self.fixed_params)
|
||||||
|
self._mock_list_2_roles()
|
||||||
|
self._mock_user_create('1234', 'fake_role_user')
|
||||||
|
self._mock_tenant_create('1234', 'fake_role_project')
|
||||||
|
with mock.patch.object(self.roles_client.RolesClient,
|
||||||
|
'create_user_role_on_project') as user_mock:
|
||||||
|
role_creds = creds.get_creds_by_roles(
|
||||||
|
roles=['role1', 'role2'], scope=scope)
|
||||||
|
calls = user_mock.mock_calls
|
||||||
|
# Assert that the role creation is called with the 2 specified roles
|
||||||
|
self.assertEqual(len(calls), 2)
|
||||||
|
|
||||||
|
# Fetch the same creds again
|
||||||
|
with mock.patch.object(self.roles_client.RolesClient,
|
||||||
|
'create_user_role_on_project') as user_mock1:
|
||||||
|
role_creds_new = creds.get_creds_by_roles(
|
||||||
|
roles=['role1', 'role2'], scope=scope)
|
||||||
|
calls = user_mock1.mock_calls
|
||||||
|
# Assert that previously created creds are return and no call to
|
||||||
|
# role creation.
|
||||||
|
self.assertEqual(len(calls), 0)
|
||||||
|
# Check if previously created creds are returned.
|
||||||
|
self.assertEqual(role_creds, role_creds_new)
|
||||||
|
|
||||||
|
def test_get_same_role_creds_with_project_scope(self):
|
||||||
|
self._test_get_same_role_creds_with_project_scope(scope='project')
|
||||||
|
|
||||||
|
def test_get_same_role_creds_with_default_scope(self):
|
||||||
|
self._test_get_same_role_creds_with_project_scope()
|
||||||
|
|
||||||
|
@mock.patch('tempest.lib.common.rest_client.RestClient')
|
||||||
|
def _test_get_different_role_creds_with_project_scope(
|
||||||
|
self, MockRestClient, scope=None):
|
||||||
|
creds = dynamic_creds.DynamicCredentialProvider(**self.fixed_params)
|
||||||
|
self._mock_list_2_roles()
|
||||||
|
self._mock_user_create('1234', 'fake_role_user')
|
||||||
|
self._mock_tenant_create('1234', 'fake_role_project')
|
||||||
|
with mock.patch.object(self.roles_client.RolesClient,
|
||||||
|
'create_user_role_on_project') as user_mock:
|
||||||
|
role_creds = creds.get_creds_by_roles(
|
||||||
|
roles=['role1', 'role2'], scope=scope)
|
||||||
|
calls = user_mock.mock_calls
|
||||||
|
# Assert that the role creation is called with the 2 specified roles
|
||||||
|
self.assertEqual(len(calls), 2)
|
||||||
|
# Fetch the creds with one role different
|
||||||
|
with mock.patch.object(self.roles_client.RolesClient,
|
||||||
|
'create_user_role_on_project') as user_mock1:
|
||||||
|
role_creds_new = creds.get_creds_by_roles(
|
||||||
|
roles=['role1'], scope=scope)
|
||||||
|
calls = user_mock1.mock_calls
|
||||||
|
# Because one role is different, assert that the role creation
|
||||||
|
# is called with the 1 specified roles
|
||||||
|
self.assertEqual(len(calls), 1)
|
||||||
|
# Check new creds is created for new roles.
|
||||||
|
self.assertNotEqual(role_creds, role_creds_new)
|
||||||
|
|
||||||
|
def test_get_different_role_creds_with_project_scope(self):
|
||||||
|
self._test_get_different_role_creds_with_project_scope(
|
||||||
|
scope='project')
|
||||||
|
|
||||||
|
def test_get_different_role_creds_with_default_scope(self):
|
||||||
|
self._test_get_different_role_creds_with_project_scope()
|
||||||
|
|
||||||
@mock.patch('tempest.lib.common.rest_client.RestClient')
|
@mock.patch('tempest.lib.common.rest_client.RestClient')
|
||||||
def test_all_cred_cleanup(self, MockRestClient):
|
def test_all_cred_cleanup(self, MockRestClient):
|
||||||
creds = dynamic_creds.DynamicCredentialProvider(**self.fixed_params)
|
creds = dynamic_creds.DynamicCredentialProvider(**self.fixed_params)
|
||||||
|
@ -707,6 +801,232 @@ class TestDynamicCredentialProviderV3(TestDynamicCredentialProvider):
|
||||||
(200, {'project': {'id': id, 'name': name}}))))
|
(200, {'project': {'id': id, 'name': name}}))))
|
||||||
return project_fix
|
return project_fix
|
||||||
|
|
||||||
|
@mock.patch('tempest.lib.common.rest_client.RestClient')
|
||||||
|
def test_role_creds_with_system_scope(self, MockRestClient):
|
||||||
|
creds = dynamic_creds.DynamicCredentialProvider(**self.fixed_params)
|
||||||
|
self._mock_list_2_roles()
|
||||||
|
self._mock_user_create('1234', 'fake_role_user')
|
||||||
|
|
||||||
|
with mock.patch.object(self.roles_client.RolesClient,
|
||||||
|
'create_user_role_on_system') as user_mock:
|
||||||
|
role_creds = creds.get_creds_by_roles(
|
||||||
|
roles=['role1', 'role2'], scope='system')
|
||||||
|
calls = user_mock.mock_calls
|
||||||
|
# Assert that the role creation is called with the 2 specified roles
|
||||||
|
self.assertEqual(len(calls), 2)
|
||||||
|
args = map(lambda x: x[1], calls)
|
||||||
|
args = list(args)
|
||||||
|
self.assertIn(('1234', '1234'), args)
|
||||||
|
self.assertIn(('1234', '12345'), args)
|
||||||
|
self.assertEqual(role_creds.username, 'fake_role_user')
|
||||||
|
self.assertEqual(role_creds.user_id, '1234')
|
||||||
|
# Verify system scope
|
||||||
|
self.assertEqual(role_creds.system, 'all')
|
||||||
|
# Verify domain is default
|
||||||
|
self.assertEqual(role_creds.domain_id, 'default')
|
||||||
|
self.assertEqual(role_creds.domain_name, 'Default')
|
||||||
|
|
||||||
|
@mock.patch('tempest.lib.common.rest_client.RestClient')
|
||||||
|
def test_get_same_role_creds_with_system_scope(self, MockRestClient):
|
||||||
|
creds = dynamic_creds.DynamicCredentialProvider(**self.fixed_params)
|
||||||
|
self._mock_list_2_roles()
|
||||||
|
self._mock_user_create('1234', 'fake_role_user')
|
||||||
|
with mock.patch.object(self.roles_client.RolesClient,
|
||||||
|
'create_user_role_on_system') as user_mock:
|
||||||
|
role_creds = creds.get_creds_by_roles(
|
||||||
|
roles=['role1', 'role2'], scope='system')
|
||||||
|
calls = user_mock.mock_calls
|
||||||
|
# Assert that the role creation is called with the 2 specified roles
|
||||||
|
self.assertEqual(len(calls), 2)
|
||||||
|
|
||||||
|
# Fetch the same creds again
|
||||||
|
with mock.patch.object(self.roles_client.RolesClient,
|
||||||
|
'create_user_role_on_system') as user_mock1:
|
||||||
|
role_creds_new = creds.get_creds_by_roles(
|
||||||
|
roles=['role1', 'role2'], scope='system')
|
||||||
|
calls = user_mock1.mock_calls
|
||||||
|
# Assert that previously created creds are return and no call to
|
||||||
|
# role creation.
|
||||||
|
self.assertEqual(len(calls), 0)
|
||||||
|
# Verify system scope
|
||||||
|
self.assertEqual(role_creds_new.system, 'all')
|
||||||
|
# Check if previously created creds are returned.
|
||||||
|
self.assertEqual(role_creds, role_creds_new)
|
||||||
|
|
||||||
|
@mock.patch('tempest.lib.common.rest_client.RestClient')
|
||||||
|
def test_get_different_role_creds_with_system_scope(self, MockRestClient):
|
||||||
|
creds = dynamic_creds.DynamicCredentialProvider(**self.fixed_params)
|
||||||
|
self._mock_list_2_roles()
|
||||||
|
self._mock_user_create('1234', 'fake_role_user')
|
||||||
|
|
||||||
|
with mock.patch.object(self.roles_client.RolesClient,
|
||||||
|
'create_user_role_on_system') as user_mock:
|
||||||
|
role_creds = creds.get_creds_by_roles(
|
||||||
|
roles=['role1', 'role2'], scope='system')
|
||||||
|
calls = user_mock.mock_calls
|
||||||
|
# Assert that the role creation is called with the 2 specified roles
|
||||||
|
self.assertEqual(len(calls), 2)
|
||||||
|
# Verify system scope
|
||||||
|
self.assertEqual(role_creds.system, 'all')
|
||||||
|
# Fetch the creds with one role different
|
||||||
|
with mock.patch.object(self.roles_client.RolesClient,
|
||||||
|
'create_user_role_on_system') as user_mock1:
|
||||||
|
role_creds_new = creds.get_creds_by_roles(
|
||||||
|
roles=['role1'], scope='system')
|
||||||
|
calls = user_mock1.mock_calls
|
||||||
|
# Because one role is different, assert that the role creation
|
||||||
|
# is called with the 1 specified roles
|
||||||
|
self.assertEqual(len(calls), 1)
|
||||||
|
# Verify Scope
|
||||||
|
self.assertEqual(role_creds_new.system, 'all')
|
||||||
|
# Check new creds is created for new roles.
|
||||||
|
self.assertNotEqual(role_creds, role_creds_new)
|
||||||
|
|
||||||
|
@mock.patch('tempest.lib.common.rest_client.RestClient')
|
||||||
|
def test_role_creds_with_domain_scope(self, MockRestClient):
|
||||||
|
creds = dynamic_creds.DynamicCredentialProvider(**self.fixed_params)
|
||||||
|
self._mock_list_2_roles()
|
||||||
|
self._mock_user_create('1234', 'fake_role_user')
|
||||||
|
|
||||||
|
domain = {
|
||||||
|
"id": '12',
|
||||||
|
"enabled": True,
|
||||||
|
"name": "TestDomain"
|
||||||
|
}
|
||||||
|
|
||||||
|
self.useFixture(fixtures.MockPatch(
|
||||||
|
'tempest.lib.common.cred_client.V3CredsClient.create_domain',
|
||||||
|
return_value=domain))
|
||||||
|
|
||||||
|
with mock.patch.object(self.roles_client.RolesClient,
|
||||||
|
'create_user_role_on_domain') as user_mock:
|
||||||
|
role_creds = creds.get_creds_by_roles(
|
||||||
|
roles=['role1', 'role2'], scope='domain')
|
||||||
|
calls = user_mock.mock_calls
|
||||||
|
# Assert that the role creation is called with the 2 specified roles
|
||||||
|
self.assertEqual(len(calls), 2)
|
||||||
|
args = map(lambda x: x[1], calls)
|
||||||
|
args = list(args)
|
||||||
|
self.assertIn((domain['id'], '1234', '1234'), args)
|
||||||
|
self.assertIn((domain['id'], '1234', '12345'), args)
|
||||||
|
self.assertEqual(role_creds.username, 'fake_role_user')
|
||||||
|
self.assertEqual(role_creds.user_id, '1234')
|
||||||
|
# Verify creds are under new created domain
|
||||||
|
self.assertEqual(role_creds.domain_id, domain['id'])
|
||||||
|
self.assertEqual(role_creds.domain_name, domain['name'])
|
||||||
|
# Verify that Scope is None
|
||||||
|
self.assertIsNone(role_creds.system)
|
||||||
|
|
||||||
|
@mock.patch('tempest.lib.common.rest_client.RestClient')
|
||||||
|
def test_get_same_role_creds_with_domain_scope(self, MockRestClient):
|
||||||
|
creds = dynamic_creds.DynamicCredentialProvider(**self.fixed_params)
|
||||||
|
self._mock_list_2_roles()
|
||||||
|
self._mock_user_create('1234', 'fake_role_user')
|
||||||
|
|
||||||
|
domain = {
|
||||||
|
"id": '12',
|
||||||
|
"enabled": True,
|
||||||
|
"name": "TestDomain"
|
||||||
|
}
|
||||||
|
|
||||||
|
self.useFixture(fixtures.MockPatch(
|
||||||
|
'tempest.lib.common.cred_client.V3CredsClient.create_domain',
|
||||||
|
return_value=domain))
|
||||||
|
|
||||||
|
with mock.patch.object(self.roles_client.RolesClient,
|
||||||
|
'create_user_role_on_domain') as user_mock:
|
||||||
|
role_creds = creds.get_creds_by_roles(
|
||||||
|
roles=['role1', 'role2'], scope='domain')
|
||||||
|
calls = user_mock.mock_calls
|
||||||
|
# Assert that the role creation is called with the 2 specified roles
|
||||||
|
self.assertEqual(len(calls), 2)
|
||||||
|
self.assertEqual(role_creds.user_id, '1234')
|
||||||
|
# Verify Scope
|
||||||
|
self.assertIsNone(role_creds.system)
|
||||||
|
# Fetch the same creds again
|
||||||
|
with mock.patch.object(self.roles_client.RolesClient,
|
||||||
|
'create_user_role_on_domain') as user_mock1:
|
||||||
|
role_creds_new = creds.get_creds_by_roles(
|
||||||
|
roles=['role1', 'role2'], scope='domain')
|
||||||
|
calls = user_mock1.mock_calls
|
||||||
|
# Assert that previously created creds are return and no call to
|
||||||
|
# role creation.
|
||||||
|
self.assertEqual(len(calls), 0)
|
||||||
|
# Verify Scope
|
||||||
|
self.assertIsNone(role_creds_new.system)
|
||||||
|
# Check if previously created creds are returned.
|
||||||
|
self.assertEqual(role_creds, role_creds_new)
|
||||||
|
|
||||||
|
@mock.patch('tempest.lib.common.rest_client.RestClient')
|
||||||
|
def test_get_different_role_creds_with_domain_scope(self, MockRestClient):
|
||||||
|
creds = dynamic_creds.DynamicCredentialProvider(**self.fixed_params)
|
||||||
|
self._mock_list_2_roles()
|
||||||
|
self._mock_user_create('1234', 'fake_role_user')
|
||||||
|
|
||||||
|
domain = {
|
||||||
|
"id": '12',
|
||||||
|
"enabled": True,
|
||||||
|
"name": "TestDomain"
|
||||||
|
}
|
||||||
|
|
||||||
|
self.useFixture(fixtures.MockPatch(
|
||||||
|
'tempest.lib.common.cred_client.V3CredsClient.create_domain',
|
||||||
|
return_value=domain))
|
||||||
|
|
||||||
|
with mock.patch.object(self.roles_client.RolesClient,
|
||||||
|
'create_user_role_on_domain') as user_mock:
|
||||||
|
role_creds = creds.get_creds_by_roles(
|
||||||
|
roles=['role1', 'role2'], scope='domain')
|
||||||
|
calls = user_mock.mock_calls
|
||||||
|
# Assert that the role creation is called with the 2 specified roles
|
||||||
|
self.assertEqual(len(calls), 2)
|
||||||
|
self.assertEqual(role_creds.user_id, '1234')
|
||||||
|
# Verify Scope
|
||||||
|
self.assertIsNone(role_creds.system)
|
||||||
|
# Fetch the same creds again
|
||||||
|
with mock.patch.object(self.roles_client.RolesClient,
|
||||||
|
'create_user_role_on_domain') as user_mock1:
|
||||||
|
role_creds_new = creds.get_creds_by_roles(
|
||||||
|
roles=['role1'], scope='domain')
|
||||||
|
calls = user_mock1.mock_calls
|
||||||
|
# Because one role is different, assert that the role creation
|
||||||
|
# is called with the 1 specified roles
|
||||||
|
self.assertEqual(len(calls), 1)
|
||||||
|
# Verify Scope
|
||||||
|
self.assertIsNone(role_creds_new.system)
|
||||||
|
# Check new creds is created for new roles.
|
||||||
|
self.assertNotEqual(role_creds, role_creds_new)
|
||||||
|
|
||||||
|
@mock.patch('tempest.lib.common.rest_client.RestClient')
|
||||||
|
def test_get_role_creds_with_different_scope(self, MockRestClient):
|
||||||
|
creds = dynamic_creds.DynamicCredentialProvider(**self.fixed_params)
|
||||||
|
self._mock_list_2_roles()
|
||||||
|
self._mock_user_create('1234', 'fake_role_user')
|
||||||
|
self._mock_tenant_create('1234', 'fake_role_project')
|
||||||
|
with mock.patch.object(self.roles_client.RolesClient,
|
||||||
|
'create_user_role_on_system') as user_mock:
|
||||||
|
role_creds = creds.get_creds_by_roles(
|
||||||
|
roles=['role1', 'role2'], scope='system')
|
||||||
|
calls = user_mock.mock_calls
|
||||||
|
# Assert that the role creation is called with the 2 specified roles
|
||||||
|
self.assertEqual(len(calls), 2)
|
||||||
|
# Verify Scope
|
||||||
|
self.assertEqual(role_creds.system, 'all')
|
||||||
|
|
||||||
|
# Fetch the same role creds but with different scope
|
||||||
|
with mock.patch.object(self.roles_client.RolesClient,
|
||||||
|
'create_user_role_on_project') as user_mock1:
|
||||||
|
role_creds_new = creds.get_creds_by_roles(
|
||||||
|
roles=['role1', 'role2'], scope='project')
|
||||||
|
calls = user_mock1.mock_calls
|
||||||
|
# Because scope is different, assert that the role creation
|
||||||
|
# is called with the 2 specified roles
|
||||||
|
self.assertEqual(len(calls), 2)
|
||||||
|
# Verify Scope
|
||||||
|
self.assertIsNone(role_creds_new.system)
|
||||||
|
# Check that created creds are different
|
||||||
|
self.assertNotEqual(role_creds, role_creds_new)
|
||||||
|
|
||||||
@mock.patch('tempest.lib.common.rest_client.RestClient')
|
@mock.patch('tempest.lib.common.rest_client.RestClient')
|
||||||
def test_member_role_creation_with_duplicate(self, rest_client_mock):
|
def test_member_role_creation_with_duplicate(self, rest_client_mock):
|
||||||
creds = dynamic_creds.DynamicCredentialProvider(**self.fixed_params)
|
creds = dynamic_creds.DynamicCredentialProvider(**self.fixed_params)
|
||||||
|
|
|
@ -109,7 +109,7 @@ class TestBaseTestCase(base.TestCase):
|
||||||
|
|
||||||
test.BaseTestCase.get_tenant_network(credentials_type=creds)
|
test.BaseTestCase.get_tenant_network(credentials_type=creds)
|
||||||
|
|
||||||
mock_gcm.assert_called_once_with(roles=['role1'])
|
mock_gcm.assert_called_once_with(roles=['role1'], scope='project')
|
||||||
mock_gprov.assert_called_once_with()
|
mock_gprov.assert_called_once_with()
|
||||||
mock_gtn.assert_called_once_with(mock_prov, net_client,
|
mock_gtn.assert_called_once_with(mock_prov, net_client,
|
||||||
self.fixed_network_name)
|
self.fixed_network_name)
|
||||||
|
|
|
@ -453,6 +453,130 @@ class TestTempestBaseTestClass(base.TestCase):
|
||||||
expected_creds[1][1:],
|
expected_creds[1][1:],
|
||||||
mock_get_client_manager.mock_calls[1][2]['roles'])
|
mock_get_client_manager.mock_calls[1][2]['roles'])
|
||||||
|
|
||||||
|
def test_setup_credentials_with_role_and_system_scope(self):
|
||||||
|
expected_creds = [['system_my_role', 'role1', 'role2']]
|
||||||
|
|
||||||
|
class SystemRoleCredentials(self.parent_test):
|
||||||
|
credentials = expected_creds
|
||||||
|
|
||||||
|
expected_clients = 'clients'
|
||||||
|
with mock.patch.object(
|
||||||
|
SystemRoleCredentials,
|
||||||
|
'get_client_manager') as mock_get_client_manager:
|
||||||
|
mock_get_client_manager.return_value = expected_clients
|
||||||
|
sys_creds = SystemRoleCredentials()
|
||||||
|
sys_creds.setup_credentials()
|
||||||
|
self.assertTrue(hasattr(sys_creds, 'os_system_my_role'))
|
||||||
|
self.assertEqual(expected_clients, sys_creds.os_system_my_role)
|
||||||
|
self.assertTrue(hasattr(sys_creds, 'os_roles_system_my_role'))
|
||||||
|
self.assertEqual(expected_clients, sys_creds.os_roles_system_my_role)
|
||||||
|
self.assertEqual(1, mock_get_client_manager.call_count)
|
||||||
|
self.assertEqual(
|
||||||
|
expected_creds[0][1:],
|
||||||
|
mock_get_client_manager.mock_calls[0][2]['roles'])
|
||||||
|
self.assertEqual(
|
||||||
|
'system',
|
||||||
|
mock_get_client_manager.mock_calls[0][2]['scope'])
|
||||||
|
|
||||||
|
def test_setup_credentials_with_multiple_role_and_system_scope(self):
|
||||||
|
expected_creds = [['system_my_role', 'role1', 'role2'],
|
||||||
|
['system_my_role2', 'role1', 'role2'],
|
||||||
|
['system_my_role3', 'role3']]
|
||||||
|
|
||||||
|
class SystemRoleCredentials(self.parent_test):
|
||||||
|
credentials = expected_creds
|
||||||
|
|
||||||
|
expected_clients = 'clients'
|
||||||
|
with mock.patch.object(
|
||||||
|
SystemRoleCredentials,
|
||||||
|
'get_client_manager') as mock_get_client_manager:
|
||||||
|
mock_get_client_manager.return_value = expected_clients
|
||||||
|
sys_creds = SystemRoleCredentials()
|
||||||
|
sys_creds.setup_credentials()
|
||||||
|
self.assertTrue(hasattr(sys_creds, 'os_system_my_role'))
|
||||||
|
self.assertEqual(expected_clients, sys_creds.os_system_my_role)
|
||||||
|
self.assertTrue(hasattr(sys_creds, 'os_roles_system_my_role'))
|
||||||
|
self.assertEqual(expected_clients, sys_creds.os_roles_system_my_role)
|
||||||
|
self.assertTrue(hasattr(sys_creds, 'os_system_my_role2'))
|
||||||
|
self.assertEqual(expected_clients, sys_creds.os_system_my_role2)
|
||||||
|
self.assertTrue(hasattr(sys_creds, 'os_roles_system_my_role2'))
|
||||||
|
self.assertEqual(expected_clients, sys_creds.os_roles_system_my_role2)
|
||||||
|
self.assertTrue(hasattr(sys_creds, 'os_system_my_role3'))
|
||||||
|
self.assertEqual(expected_clients, sys_creds.os_system_my_role3)
|
||||||
|
self.assertTrue(hasattr(sys_creds, 'os_roles_system_my_role3'))
|
||||||
|
self.assertEqual(expected_clients, sys_creds.os_roles_system_my_role3)
|
||||||
|
self.assertEqual(3, mock_get_client_manager.call_count)
|
||||||
|
self.assertEqual(
|
||||||
|
expected_creds[0][1:],
|
||||||
|
mock_get_client_manager.mock_calls[0][2]['roles'])
|
||||||
|
self.assertEqual(
|
||||||
|
'system', mock_get_client_manager.mock_calls[0][2]['scope'])
|
||||||
|
self.assertEqual(
|
||||||
|
expected_creds[1][1:],
|
||||||
|
mock_get_client_manager.mock_calls[1][2]['roles'])
|
||||||
|
self.assertEqual(
|
||||||
|
'system', mock_get_client_manager.mock_calls[1][2]['scope'])
|
||||||
|
self.assertEqual(
|
||||||
|
expected_creds[2][1:],
|
||||||
|
mock_get_client_manager.mock_calls[2][2]['roles'])
|
||||||
|
self.assertEqual(
|
||||||
|
'system', mock_get_client_manager.mock_calls[2][2]['scope'])
|
||||||
|
|
||||||
|
def test_setup_credentials_with_role_and_multiple_scope(self):
|
||||||
|
expected_creds = [['my_role', 'role1', 'role2'],
|
||||||
|
['project_my_role', 'role1', 'role2'],
|
||||||
|
['domain_my_role', 'role1', 'role2'],
|
||||||
|
['system_my_role', 'role1', 'role2']]
|
||||||
|
|
||||||
|
class SystemRoleCredentials(self.parent_test):
|
||||||
|
credentials = expected_creds
|
||||||
|
|
||||||
|
expected_clients = 'clients'
|
||||||
|
with mock.patch.object(
|
||||||
|
SystemRoleCredentials,
|
||||||
|
'get_client_manager') as mock_get_client_manager:
|
||||||
|
mock_get_client_manager.return_value = expected_clients
|
||||||
|
sys_creds = SystemRoleCredentials()
|
||||||
|
sys_creds.setup_credentials()
|
||||||
|
self.assertTrue(hasattr(sys_creds, 'os_my_role'))
|
||||||
|
self.assertEqual(expected_clients, sys_creds.os_my_role)
|
||||||
|
self.assertTrue(hasattr(sys_creds, 'os_roles_my_role'))
|
||||||
|
self.assertEqual(expected_clients, sys_creds.os_roles_my_role)
|
||||||
|
self.assertTrue(hasattr(sys_creds, 'os_project_my_role'))
|
||||||
|
self.assertEqual(expected_clients, sys_creds.os_project_my_role)
|
||||||
|
self.assertTrue(hasattr(sys_creds, 'os_roles_project_my_role'))
|
||||||
|
self.assertEqual(expected_clients, sys_creds.os_roles_project_my_role)
|
||||||
|
self.assertTrue(hasattr(sys_creds, 'os_domain_my_role'))
|
||||||
|
self.assertEqual(expected_clients, sys_creds.os_domain_my_role)
|
||||||
|
self.assertTrue(hasattr(sys_creds, 'os_roles_domain_my_role'))
|
||||||
|
self.assertEqual(expected_clients, sys_creds.os_roles_domain_my_role)
|
||||||
|
self.assertTrue(hasattr(sys_creds, 'os_system_my_role'))
|
||||||
|
self.assertEqual(expected_clients, sys_creds.os_system_my_role)
|
||||||
|
self.assertTrue(hasattr(sys_creds, 'os_roles_system_my_role'))
|
||||||
|
self.assertEqual(expected_clients, sys_creds.os_roles_system_my_role)
|
||||||
|
|
||||||
|
self.assertEqual(4, mock_get_client_manager.call_count)
|
||||||
|
self.assertEqual(
|
||||||
|
expected_creds[0][1:],
|
||||||
|
mock_get_client_manager.mock_calls[0][2]['roles'])
|
||||||
|
self.assertEqual(
|
||||||
|
'project', mock_get_client_manager.mock_calls[0][2]['scope'])
|
||||||
|
self.assertEqual(
|
||||||
|
expected_creds[1][1:],
|
||||||
|
mock_get_client_manager.mock_calls[1][2]['roles'])
|
||||||
|
self.assertEqual(
|
||||||
|
'project', mock_get_client_manager.mock_calls[1][2]['scope'])
|
||||||
|
self.assertEqual(
|
||||||
|
expected_creds[2][1:],
|
||||||
|
mock_get_client_manager.mock_calls[2][2]['roles'])
|
||||||
|
self.assertEqual(
|
||||||
|
'domain', mock_get_client_manager.mock_calls[2][2]['scope'])
|
||||||
|
self.assertEqual(
|
||||||
|
expected_creds[3][1:],
|
||||||
|
mock_get_client_manager.mock_calls[3][2]['roles'])
|
||||||
|
self.assertEqual(
|
||||||
|
'system', mock_get_client_manager.mock_calls[3][2]['scope'])
|
||||||
|
|
||||||
def test_setup_class_overwritten(self):
|
def test_setup_class_overwritten(self):
|
||||||
|
|
||||||
class OverridesSetup(self.parent_test):
|
class OverridesSetup(self.parent_test):
|
||||||
|
|
Loading…
Reference in New Issue