Fix creation of requested creds within the same project

We have a bug in dynamic creds creation where project creds
with different roles are created under a new projects. Creds
of different role of projects must be created within the same
project.

Fixing the creation of 'project_admin', 'project_member',
'project_reader', 'primary' creds in same projects. All the alt
creds will be created under same projects. but main and alt creds
will use different project, for example, 'project_alt_member'
and 'project_member' creds will be created in different project.

'admin' creds will continue be in new project as many test
use it as legacy admin.

Closes-Bug: #1964509
Change-Id: I9af005e2900195c42ecbbf7434facae2d3952f30
This commit is contained in:
Ghanshyam Mann 2023-01-18 23:22:29 -06:00 committed by Ghanshyam
parent 11d4fc9e41
commit 35fc95dbd0
5 changed files with 387 additions and 65 deletions

View File

@ -203,6 +203,10 @@ Project scoped personas:
cls.az_p_reader_client = (
cls.os_project_reader.availability_zone_client)
.. note::
'primary', 'project_admin', 'project_member', and 'project_reader'
credentials will be created under same project.
#. Project alternate Admin: This is supported and can be requested and used from
the test as below:
@ -248,6 +252,10 @@ Project scoped personas:
cls.az_p_alt_reader_client = (
cls.os_project_alt_reader.availability_zone_client)
.. note::
'alt', 'project_alt_admin', 'project_alt_member', and
'project_alt_reader' credentials will be created under same project.
#. Project other roles: This is supported and can be requested and used from
the test as below:
@ -269,6 +277,16 @@ Project scoped personas:
cls.az_role2_client = (
cls.os_project_my_role2.availability_zone_client)
.. note::
'admin' credenatials is considered and kept as legacy admin and
will be created under new project. If any test want to test with
admin role in projectA and non-admin/admin in projectB then test
can request projectA admin using 'admin' or 'project_alt_admin'
and non-admin in projectB using 'primary', 'project_member',
or 'project_reader'/admin in projectB using 'project_admin'. Many
existing tests using the 'admin' with new project to assert on the
resource list so we are keeping 'admin' a kind of legacy admin.
Pre-Provisioned Credentials
---------------------------

View File

@ -0,0 +1,19 @@
---
fixes:
- |
There was a bug (bug#1964509) in dynamic credentials creation where
project credentials with different roles are created with the new
projects. Credential of different role of projects must be created
within the same project. For exmaple, 'project_admin', 'project_member',
'project_reader', and 'primary', credentials will be created in the
same projects. 'alt', 'project_alt_admin', 'project_alt_member',
'project_alt_reader' will be created within the same project.
'admin' credenatials is considered and kept as legacy admin and
will be created under new project. If any test want to test with
admin role in projectA and non-admin/admin in projectB then test
can request projectA admin using 'admin' or 'project_alt_admin'
and non-admin in projectB using 'primary', 'project_member',
or 'project_reader'/admin in projectB using 'project_admin'. Many
existing tests using the 'admin' with new project to assert on the
resource list so we are keeping 'admin' a kind of legacy admin.

View File

@ -58,6 +58,10 @@ class CredsClient(object, metaclass=abc.ABCMeta):
def create_project(self, name, description):
pass
@abc.abstractmethod
def show_project(self, project_id):
pass
def _check_role_exists(self, role_name):
try:
roles = self._list_roles()
@ -118,6 +122,9 @@ class V2CredsClient(CredsClient):
name=name, description=description)['tenant']
return tenant
def show_project(self, project_id):
return self.projects_client.show_tenant(project_id)['tenant']
def delete_project(self, project_id):
self.projects_client.delete_tenant(project_id)
@ -159,6 +166,9 @@ class V3CredsClient(CredsClient):
domain_id=self.creds_domain['id'])['project']
return project
def show_project(self, project_id):
return self.projects_client.show_project(project_id)['project']
def delete_project(self, project_id):
self.projects_client.delete_project(project_id)

View File

@ -163,7 +163,8 @@ class DynamicCredentialProvider(cred_provider.CredentialProvider):
os.network.PortsClient(),
os.network.SecurityGroupsClient())
def _create_creds(self, admin=False, roles=None, scope='project'):
def _create_creds(self, admin=False, roles=None, scope='project',
project_id=None):
"""Create credentials with random name.
Creates user and role assignments on a project, domain, or system. When
@ -177,6 +178,8 @@ class DynamicCredentialProvider(cred_provider.CredentialProvider):
:type roles: list
:param str scope: The scope for the role assignment, may be one of
'project', 'domain', or 'system'.
:param str project_id: The project id of already created project
for credentials under same project.
:return: Readonly Credentials with network resources
:raises: Exception if scope is invalid
"""
@ -190,12 +193,20 @@ class DynamicCredentialProvider(cred_provider.CredentialProvider):
'system': None
}
if scope == 'project':
project_name = data_utils.rand_name(
root, prefix=self.resource_prefix)
project_desc = project_name + '-desc'
project = self.creds_client.create_project(
name=project_name, description=project_desc)
if not project_id:
project_name = data_utils.rand_name(
root, prefix=self.resource_prefix)
project_desc = project_name + '-desc'
project = self.creds_client.create_project(
name=project_name, description=project_desc)
else:
# NOTE(gmann) This is the case where creds are requested
# from the existing creds within same project. We should
# not create the new project in this case.
project = self.creds_client.show_project(project_id)
project_name = project['name']
LOG.info("Using the existing project %s for scope %s and "
"roles: %s", project['id'], scope, roles)
# NOTE(andreaf) User and project can be distinguished from the
# context, having the same ID in both makes it easier to match them
# and debug.
@ -372,48 +383,78 @@ class DynamicCredentialProvider(cred_provider.CredentialProvider):
self.routers_admin_client.add_router_interface(router_id,
subnet_id=subnet_id)
def get_credentials(self, credential_type, scope=None):
if not scope and self._creds.get(str(credential_type)):
credentials = self._creds[str(credential_type)]
elif scope and (
self._creds.get("%s_%s" % (scope, str(credential_type)))):
credentials = self._creds["%s_%s" % (scope, str(credential_type))]
def _get_project_id(self, credential_type, scope):
same_creds = [['admin'], ['member'], ['reader']]
same_alt_creds = [['alt_admin'], ['alt_member'], ['alt_reader']]
search_in = []
if credential_type in same_creds:
search_in = same_creds
elif credential_type in same_alt_creds:
search_in = same_alt_creds
for cred in search_in:
found_cred = self._creds.get("%s_%s" % (scope, str(cred)))
if found_cred:
project_id = found_cred.get("%s_%s" % (scope, 'id'))
LOG.debug("Reusing existing project %s from creds: %s ",
project_id, found_cred)
return project_id
return None
def get_credentials(self, credential_type, scope=None, by_role=False):
cred_prefix = ''
if by_role:
cred_prefix = 'role_'
if not scope and self._creds.get(
"%s%s" % (cred_prefix, str(credential_type))):
credentials = self._creds[
"%s%s" % (cred_prefix, str(credential_type))]
elif scope and (self._creds.get(
"%s%s_%s" % (cred_prefix, scope, str(credential_type)))):
credentials = self._creds[
"%s%s_%s" % (cred_prefix, scope, str(credential_type))]
else:
LOG.debug("Creating new dynamic creds for scope: %s and "
"credential_type: %s", scope, credential_type)
project_id = None
if scope:
if credential_type in [['admin'], ['alt_admin']]:
if scope == 'project':
project_id = self._get_project_id(
credential_type, 'project')
if by_role:
credentials = self._create_creds(
admin=True, scope=scope)
roles=credential_type, scope=scope)
elif credential_type in [['admin'], ['alt_admin']]:
credentials = self._create_creds(
admin=True, scope=scope, project_id=project_id)
elif credential_type in [['alt_member'], ['alt_reader']]:
cred_type = credential_type[0][4:]
if isinstance(cred_type, str):
cred_type = [cred_type]
credentials = self._create_creds(
roles=cred_type, scope=scope)
else:
roles=cred_type, scope=scope, project_id=project_id)
elif credential_type in [['member'], ['reader']]:
credentials = self._create_creds(
roles=credential_type, scope=scope)
roles=credential_type, scope=scope,
project_id=project_id)
elif credential_type in ['primary', 'alt', 'admin']:
is_admin = (credential_type == 'admin')
credentials = self._create_creds(admin=is_admin)
else:
credentials = self._create_creds(roles=credential_type)
if scope:
self._creds["%s_%s" %
(scope, str(credential_type))] = credentials
self._creds["%s%s_%s" % (
cred_prefix, scope, str(credential_type))] = credentials
else:
self._creds[str(credential_type)] = credentials
self._creds[
"%s%s" % (cred_prefix, str(credential_type))] = credentials
# Maintained until tests are ported
LOG.info("Acquired dynamic creds:\n"
" credentials: %s", credentials)
# NOTE(gmann): For 'domain' and 'system' scoped token, there is no
# project_id so we are skipping the network creation for both
# scope. How these scoped token can create the network, Nova
# server or other project mapped resources is one of the open
# question and discussed a lot in Xena cycle PTG. Once we sort
# out that then if needed we can update the network creation here.
if (not scope or scope == 'project'):
# scope.
# We need to create nework resource once per project.
if (not project_id and (not scope or scope == 'project')):
if (self.neutron_available and self.create_networks):
network, subnet, router = self._create_network_resources(
credentials.tenant_id)
@ -422,24 +463,22 @@ class DynamicCredentialProvider(cred_provider.CredentialProvider):
LOG.info("Created isolated network resources for:\n"
" credentials: %s", credentials)
else:
LOG.info("Network resources are not created for scope: %s",
scope)
LOG.info("Network resources are not created for requested "
"scope: %s and credentials: %s", scope, credentials)
return credentials
# TODO(gmann): Remove this method in favor of get_project_member_creds()
# after the deprecation phase.
def get_primary_creds(self):
return self.get_credentials('primary')
return self.get_project_member_creds()
# TODO(gmann): Remove this method in favor of get_project_admin_creds()
# after the deprecation phase.
def get_admin_creds(self):
return self.get_credentials('admin')
# TODO(gmann): Replace this method with more appropriate name.
# like get_project_alt_member_creds()
# TODO(gmann): Remove this method in favor of
# get_project_alt_member_creds() after the deprecation phase.
def get_alt_creds(self):
return self.get_credentials('alt')
return self.get_project_alt_member_creds()
def get_system_admin_creds(self):
return self.get_credentials(['admin'], scope='system')
@ -481,9 +520,9 @@ class DynamicCredentialProvider(cred_provider.CredentialProvider):
roles = list(set(roles))
# The roles list as a str will become the index as the dict key for
# the created credentials set in the dynamic_creds dict.
creds_name = str(roles)
creds_name = "role_%s" % str(roles)
if scope:
creds_name = "%s_%s" % (scope, str(roles))
creds_name = "role_%s_%s" % (scope, str(roles))
exist_creds = self._creds.get(creds_name)
# If force_new flag is True 2 cred sets with the same roles are needed
# handle this by creating a separate index for old one to store it
@ -492,7 +531,7 @@ class DynamicCredentialProvider(cred_provider.CredentialProvider):
new_index = creds_name + '-' + str(len(self._creds))
self._creds[new_index] = exist_creds
del self._creds[creds_name]
return self.get_credentials(roles, scope=scope)
return self.get_credentials(roles, scope=scope, by_role=True)
def _clear_isolated_router(self, router_id, router_name):
client = self.routers_admin_client
@ -553,31 +592,20 @@ class DynamicCredentialProvider(cred_provider.CredentialProvider):
if not self._creds:
return
self._clear_isolated_net_resources()
project_ids = set()
for creds in self._creds.values():
# NOTE(gmann): With new RBAC personas, we can have single project
# and multiple user created under it, to avoid conflict let's
# cleanup the projects at the end.
# Adding project if id is not None, means leaving domain and
# system creds.
if creds.project_id:
project_ids.add(creds.project_id)
try:
self.creds_client.delete_user(creds.user_id)
except lib_exc.NotFound:
LOG.warning("user with name: %s not found for delete",
creds.username)
if creds.tenant_id:
# NOTE(zhufl): Only when neutron's security_group ext is
# enabled, cleanup_default_secgroup will not raise error. But
# here cannot use test_utils.is_extension_enabled for it will
# cause "circular dependency". So here just use try...except to
# ensure tenant deletion without big changes.
try:
if self.neutron_available:
self.cleanup_default_secgroup(
self.security_groups_admin_client, creds.tenant_id)
except lib_exc.NotFound:
LOG.warning("failed to cleanup tenant %s's secgroup",
creds.tenant_name)
try:
self.creds_client.delete_project(creds.tenant_id)
except lib_exc.NotFound:
LOG.warning("tenant with name: %s not found for delete",
creds.tenant_name)
# if cred is domain scoped, delete ephemeral domain
# do not delete default domain
if (hasattr(creds, 'domain_id') and
@ -587,6 +615,28 @@ class DynamicCredentialProvider(cred_provider.CredentialProvider):
except lib_exc.NotFound:
LOG.warning("domain with name: %s not found for delete",
creds.domain_name)
for project_id in project_ids:
# NOTE(zhufl): Only when neutron's security_group ext is
# enabled, cleanup_default_secgroup will not raise error. But
# here cannot use test_utils.is_extension_enabled for it will
# cause "circular dependency". So here just use try...except to
# ensure tenant deletion without big changes.
LOG.info("Deleting project and security group for project: %s",
project_id)
try:
if self.neutron_available:
self.cleanup_default_secgroup(
self.security_groups_admin_client, project_id)
except lib_exc.NotFound:
LOG.warning("failed to cleanup tenant %s's secgroup",
project_id)
try:
self.creds_client.delete_project(project_id)
except lib_exc.NotFound:
LOG.warning("tenant with id: %s not found for delete",
project_id)
self._creds = {}
def is_multi_user(self):

View File

@ -60,6 +60,7 @@ class TestDynamicCredentialProvider(base.TestCase):
fake_response = fake_identity._fake_v2_response
tenants_client_class = tenants_client.TenantsClient
delete_tenant = 'delete_tenant'
create_tenant = 'create_tenant'
def setUp(self):
super(TestDynamicCredentialProvider, self).setUp()
@ -140,7 +141,9 @@ class TestDynamicCredentialProvider(base.TestCase):
return_value=(rest_client.ResponseBody
(200, {'roles': [
{'id': '1', 'name': 'FakeRole'},
{'id': '2', 'name': 'member'}]}))))
{'id': '2', 'name': 'member'},
{'id': '3', 'name': 'reader'},
{'id': '4', 'name': 'admin'}]}))))
return roles_fix
def _mock_list_ec2_credentials(self, user_id, tenant_id):
@ -191,6 +194,205 @@ class TestDynamicCredentialProvider(base.TestCase):
self.assertEqual(primary_creds.tenant_id, '1234')
self.assertEqual(primary_creds.user_id, '1234')
def _request_and_check_second_creds(
self, creds_obj, func, creds_to_compare,
show_mock, sm_count=1, sm_count_in_diff_project=0,
same_project_request=True, **func_kwargs):
self._mock_user_create('111', 'fake_user')
with mock.patch.object(creds_obj.creds_client,
'create_project') as create_mock:
create_mock.return_value = {'id': '22', 'name': 'fake_project'}
new_creds = func(**func_kwargs)
if same_project_request:
# Check that with second creds request, create_project is not
# called and show_project is called. Which means new project is
# not created for the second requested creds instead new user is
# created under existing project.
self.assertEqual(len(create_mock.mock_calls), 0)
self.assertEqual(len(show_mock.mock_calls), sm_count)
# Verify project name and id is same as creds_to_compare
self.assertEqual(creds_to_compare.tenant_name,
new_creds.tenant_name)
self.assertEqual(creds_to_compare.tenant_id,
new_creds.tenant_id)
else:
# Check that with different project creds request, create_project
# is called and show_project is not called. Which means new project
# is created for this new creds request.
self.assertEqual(len(create_mock.mock_calls), 1)
self.assertEqual(len(show_mock.mock_calls),
sm_count_in_diff_project)
# Verify project name and id is not same as creds_to_compare
self.assertNotEqual(creds_to_compare.tenant_name,
new_creds.tenant_name)
self.assertNotEqual(creds_to_compare.tenant_id,
new_creds.tenant_id)
self.assertEqual(new_creds.tenant_name, 'fake_project')
self.assertEqual(new_creds.tenant_id, '22')
# Verify new user name and id
self.assertEqual(new_creds.username, 'fake_user')
self.assertEqual(new_creds.user_id, '111')
return new_creds
@mock.patch('tempest.lib.common.rest_client.RestClient')
def _creds_within_same_project(self, MockRestClient, test_alt_creds=False):
creds = dynamic_creds.DynamicCredentialProvider(**self.fixed_params)
if test_alt_creds:
admin_func = creds.get_project_alt_admin_creds
member_func = creds.get_project_alt_member_creds
reader_func = creds.get_project_alt_reader_creds
else:
admin_func = creds.get_project_admin_creds
member_func = creds.get_project_member_creds
reader_func = creds.get_project_reader_creds
self._mock_assign_user_role()
self._mock_list_role()
self._mock_user_create('11', 'fake_user1')
show_mock = self.patchobject(creds.creds_client, 'show_project')
show_mock.return_value = {'id': '21', 'name': 'fake_project1'}
with mock.patch.object(creds.creds_client,
'create_project') as create_mock:
create_mock.return_value = {'id': '21', 'name': 'fake_project1'}
member_creds = member_func()
# Check that with first creds request, create_project is called and
# show_project is not called. Which means new project is created for
# the requested creds.
self.assertEqual(len(create_mock.mock_calls), 1)
self.assertEqual(len(show_mock.mock_calls), 0)
# Verify project, user name and IDs
self.assertEqual(member_creds.username, 'fake_user1')
self.assertEqual(member_creds.tenant_name, 'fake_project1')
self.assertEqual(member_creds.tenant_id, '21')
self.assertEqual(member_creds.user_id, '11')
# Now request for the project reader creds which should not create new
# project instead should use the project_id of member_creds already
# created project.
self._request_and_check_second_creds(
creds, reader_func, member_creds, show_mock)
# Now request for the project admin creds which should not create new
# project instead should use the project_id of member_creds already
# created project.
self._request_and_check_second_creds(
creds, admin_func, member_creds, show_mock, sm_count=2)
def test_creds_within_same_project(self):
self._creds_within_same_project()
def test_alt_creds_within_same_project(self):
self._creds_within_same_project(test_alt_creds=True)
@mock.patch('tempest.lib.common.rest_client.RestClient')
def test_creds_in_different_project(self, MockRestClient):
creds = dynamic_creds.DynamicCredentialProvider(**self.fixed_params)
self._mock_assign_user_role()
self._mock_list_role()
self._mock_user_create('11', 'fake_user1')
show_mock = self.patchobject(creds.creds_client, 'show_project')
show_mock.return_value = {'id': '21', 'name': 'fake_project1'}
with mock.patch.object(creds.creds_client,
'create_project') as create_mock:
create_mock.return_value = {'id': '21', 'name': 'fake_project1'}
member_creds = creds.get_project_member_creds()
# Check that with first creds request, create_project is called and
# show_project is not called. Which means new project is created for
# the requested creds.
self.assertEqual(len(create_mock.mock_calls), 1)
self.assertEqual(len(show_mock.mock_calls), 0)
# Verify project, user name and IDs
self.assertEqual(member_creds.username, 'fake_user1')
self.assertEqual(member_creds.tenant_name, 'fake_project1')
self.assertEqual(member_creds.tenant_id, '21')
self.assertEqual(member_creds.user_id, '11')
# Now request for the project alt reader creds which should create
# new project as this request is for alt creds.
alt_reader_creds = self._request_and_check_second_creds(
creds, creds.get_project_alt_reader_creds,
member_creds, show_mock, same_project_request=False)
# Check that with second creds request, create_project is not called
# and show_project is called. Which means new project is not created
# for the second requested creds instead new user is created under
# existing project.
self._request_and_check_second_creds(
creds, creds.get_project_reader_creds, member_creds, show_mock)
# Now request for the project alt member creds which should not create
# new project instead use the alt project already created for
# alt_reader creds.
show_mock.return_value = {
'id': alt_reader_creds.tenant_id,
'name': alt_reader_creds.tenant_name}
self._request_and_check_second_creds(
creds, creds.get_project_alt_member_creds,
alt_reader_creds, show_mock, sm_count=2,
same_project_request=True)
@mock.patch('tempest.lib.common.rest_client.RestClient')
def test_creds_by_role_in_different_project(self, MockRestClient):
creds = dynamic_creds.DynamicCredentialProvider(**self.fixed_params)
self._mock_assign_user_role()
self._mock_list_role()
self._mock_user_create('11', 'fake_user1')
show_mock = self.patchobject(creds.creds_client, 'show_project')
show_mock.return_value = {'id': '21', 'name': 'fake_project1'}
with mock.patch.object(creds.creds_client,
'create_project') as create_mock:
create_mock.return_value = {'id': '21', 'name': 'fake_project1'}
member_creds = creds.get_project_member_creds()
# Check that with first creds request, create_project is called and
# show_project is not called. Which means new project is created for
# the requested creds.
self.assertEqual(len(create_mock.mock_calls), 1)
self.assertEqual(len(show_mock.mock_calls), 0)
# Verify project, user name and IDs
self.assertEqual(member_creds.username, 'fake_user1')
self.assertEqual(member_creds.tenant_name, 'fake_project1')
self.assertEqual(member_creds.tenant_id, '21')
self.assertEqual(member_creds.user_id, '11')
# Check that with second creds request, create_project is not called
# and show_project is called. Which means new project is not created
# for the second requested creds instead new user is created under
# existing project.
self._request_and_check_second_creds(
creds, creds.get_project_reader_creds, member_creds, show_mock)
# Now request the creds by role which should create new project.
self._request_and_check_second_creds(
creds, creds.get_creds_by_roles, member_creds, show_mock,
sm_count_in_diff_project=1, same_project_request=False,
roles=['member'], scope='project')
@mock.patch('tempest.lib.common.rest_client.RestClient')
def test_legacy_admin_creds_in_different_project(self, MockRestClient):
creds = dynamic_creds.DynamicCredentialProvider(**self.fixed_params)
self._mock_assign_user_role()
self._mock_list_role()
self._mock_user_create('11', 'fake_user1')
show_mock = self.patchobject(creds.creds_client, 'show_project')
show_mock.return_value = {'id': '21', 'name': 'fake_project1'}
with mock.patch.object(creds.creds_client,
'create_project') as create_mock:
create_mock.return_value = {'id': '21', 'name': 'fake_project1'}
member_creds = creds.get_project_member_creds()
# Check that with first creds request, create_project is called and
# show_project is not called. Which means new project is created for
# the requested creds.
self.assertEqual(len(create_mock.mock_calls), 1)
self.assertEqual(len(show_mock.mock_calls), 0)
# Verify project, user name and IDs
self.assertEqual(member_creds.username, 'fake_user1')
self.assertEqual(member_creds.tenant_name, 'fake_project1')
self.assertEqual(member_creds.tenant_id, '21')
self.assertEqual(member_creds.user_id, '11')
# Now request for the legacy admin creds which should create
# new project instead of using project member creds project.
self._request_and_check_second_creds(
creds, creds.get_admin_creds,
member_creds, show_mock, same_project_request=False)
@mock.patch('tempest.lib.common.rest_client.RestClient')
def test_admin_creds(self, MockRestClient):
creds = dynamic_creds.DynamicCredentialProvider(**self.fixed_params)
@ -321,7 +523,8 @@ class TestDynamicCredentialProvider(base.TestCase):
@mock.patch('tempest.lib.common.rest_client.RestClient')
def _test_get_same_role_creds_with_project_scope(self, MockRestClient,
scope=None):
scope=None,
force_new=False):
creds = dynamic_creds.DynamicCredentialProvider(**self.fixed_params)
self._mock_list_2_roles()
self._mock_user_create('1234', 'fake_role_user')
@ -329,7 +532,7 @@ class TestDynamicCredentialProvider(base.TestCase):
with mock.patch.object(self.roles_client.RolesClient,
'create_user_role_on_project') as user_mock:
role_creds = creds.get_creds_by_roles(
roles=['role1', 'role2'], scope=scope)
roles=['role1', 'role2'], force_new=force_new, scope=scope)
calls = user_mock.mock_calls
# Assert that the role creation is called with the 2 specified roles
self.assertEqual(len(calls), 2)
@ -338,13 +541,18 @@ class TestDynamicCredentialProvider(base.TestCase):
with mock.patch.object(self.roles_client.RolesClient,
'create_user_role_on_project') as user_mock1:
role_creds_new = creds.get_creds_by_roles(
roles=['role1', 'role2'], scope=scope)
roles=['role1', 'role2'], force_new=force_new, scope=scope)
calls = user_mock1.mock_calls
# With force_new, assert that new creds are created
if force_new:
self.assertEqual(len(calls), 2)
self.assertNotEqual(role_creds, role_creds_new)
# Assert that previously created creds are return and no call to
# role creation.
self.assertEqual(len(calls), 0)
# role creation
# Check if previously created creds are returned.
self.assertEqual(role_creds, role_creds_new)
else:
self.assertEqual(len(calls), 0)
self.assertEqual(role_creds, role_creds_new)
def test_get_same_role_creds_with_project_scope(self):
self._test_get_same_role_creds_with_project_scope(scope='project')
@ -352,6 +560,13 @@ class TestDynamicCredentialProvider(base.TestCase):
def test_get_same_role_creds_with_default_scope(self):
self._test_get_same_role_creds_with_project_scope()
def test_get_same_role_creds_with_project_scope_force_new(self):
self._test_get_same_role_creds_with_project_scope(
scope='project', force_new=True)
def test_get_same_role_creds_with_default_scope_force_new(self):
self._test_get_same_role_creds_with_project_scope(force_new=True)
@mock.patch('tempest.lib.common.rest_client.RestClient')
def _test_get_different_role_creds_with_project_scope(
self, MockRestClient, scope=None):
@ -391,8 +606,12 @@ class TestDynamicCredentialProvider(base.TestCase):
self._mock_assign_user_role()
self._mock_list_role()
self._mock_tenant_create('1234', 'fake_prim_tenant')
self._mock_user_create('1234', 'fake_prim_user')
show_mock = self.patchobject(creds.creds_client, 'show_project')
show_mock.return_value = {'id': '1234', 'name': 'fake_prim_tenant'}
self._mock_user_create('1234', 'fake_project1_user')
creds.get_primary_creds()
self._mock_user_create('12341', 'fake_project1_user')
creds.get_project_admin_creds()
self._mock_tenant_create('12345', 'fake_alt_tenant')
self._mock_user_create('12345', 'fake_alt_user')
creds.get_alt_creds()
@ -407,10 +626,11 @@ class TestDynamicCredentialProvider(base.TestCase):
creds.clear_creds()
# Verify user delete calls
calls = user_mock.mock_calls
self.assertEqual(len(calls), 3)
self.assertEqual(len(calls), 4)
args = map(lambda x: x[1][0], calls)
args = list(args)
self.assertIn('1234', args)
self.assertIn('12341', args)
self.assertIn('12345', args)
self.assertIn('123456', args)
# Verify tenant delete calls
@ -512,6 +732,9 @@ class TestDynamicCredentialProvider(base.TestCase):
self._mock_list_role()
self._mock_user_create('1234', 'fake_prim_user')
self._mock_tenant_create('1234', 'fake_prim_tenant')
show_mock = self.patchobject(creds.creds_client, 'show_project')
show_mock.return_value = {'id': '1234', 'name': 'fake_prim_tenant'}
self._mock_user_create('12341', 'fake_project1_user')
self._mock_network_create(creds, '1234', 'fake_net')
self._mock_subnet_create(creds, '1234', 'fake_subnet')
self._mock_router_create('1234', 'fake_router')
@ -519,6 +742,7 @@ class TestDynamicCredentialProvider(base.TestCase):
'tempest.lib.services.network.routers_client.RoutersClient.'
'add_router_interface')
creds.get_primary_creds()
creds.get_project_admin_creds()
router_interface_mock.assert_called_once_with('1234', subnet_id='1234')
router_interface_mock.reset_mock()
# Create alternate tenant and network
@ -779,6 +1003,7 @@ class TestDynamicCredentialProviderV3(TestDynamicCredentialProvider):
fake_response = fake_identity._fake_v3_response
tenants_client_class = tenants_client.ProjectsClient
delete_tenant = 'delete_project'
create_tenant = 'create_project'
def setUp(self):
super(TestDynamicCredentialProviderV3, self).setUp()