Add credential generation for manager personas

Prepares credentials for domain manager and project manager personas,
required for RBAC tests in Keystone.

Related-Bug: https://review.opendev.org/c/openstack/keystone/+/924132
Change-Id: I2d133326eddb3cf70e2bb1711d8e20f4514f8fca
This commit is contained in:
Markus Hentsch 2024-07-15 11:22:37 +02:00
parent e47f4457cb
commit a0199bfcd0
6 changed files with 145 additions and 5 deletions

View File

@ -0,0 +1,7 @@
---
features:
- |
Add support for project manager and domain manager personas by adding
``get_project_manager_creds`` and ``get_domain_manager_creds`` to
the ``DynamicCredentialProvider`` and ``PreProvisionedCredentialProvider``
classes of the common library.

View File

@ -75,6 +75,10 @@ class CredentialProvider(object, metaclass=abc.ABCMeta):
def get_domain_admin_creds(self): def get_domain_admin_creds(self):
return return
@abc.abstractmethod
def get_domain_manager_creds(self):
return
@abc.abstractmethod @abc.abstractmethod
def get_domain_member_creds(self): def get_domain_member_creds(self):
return return
@ -91,6 +95,10 @@ class CredentialProvider(object, metaclass=abc.ABCMeta):
def get_project_alt_admin_creds(self): def get_project_alt_admin_creds(self):
return return
@abc.abstractmethod
def get_project_manager_creds(self):
return
@abc.abstractmethod @abc.abstractmethod
def get_project_member_creds(self): def get_project_member_creds(self):
return return

View File

@ -432,7 +432,7 @@ class DynamicCredentialProvider(cred_provider.CredentialProvider):
cred_type = [cred_type] cred_type = [cred_type]
credentials = self._create_creds( credentials = self._create_creds(
roles=cred_type, scope=scope, project_id=project_id) roles=cred_type, scope=scope, project_id=project_id)
elif credential_type in [['member'], ['reader']]: elif credential_type in [['manager'], ['member'], ['reader']]:
credentials = self._create_creds( credentials = self._create_creds(
roles=credential_type, scope=scope, roles=credential_type, scope=scope,
project_id=project_id) project_id=project_id)
@ -492,6 +492,9 @@ class DynamicCredentialProvider(cred_provider.CredentialProvider):
def get_domain_admin_creds(self): def get_domain_admin_creds(self):
return self.get_credentials(['admin'], scope='domain') return self.get_credentials(['admin'], scope='domain')
def get_domain_manager_creds(self):
return self.get_credentials(['manager'], scope='domain')
def get_domain_member_creds(self): def get_domain_member_creds(self):
return self.get_credentials(['member'], scope='domain') return self.get_credentials(['member'], scope='domain')
@ -504,6 +507,9 @@ class DynamicCredentialProvider(cred_provider.CredentialProvider):
def get_project_alt_admin_creds(self): def get_project_alt_admin_creds(self):
return self.get_credentials(['alt_admin'], scope='project') return self.get_credentials(['alt_admin'], scope='project')
def get_project_manager_creds(self):
return self.get_credentials(['manager'], scope='project')
def get_project_member_creds(self): def get_project_member_creds(self):
return self.get_credentials(['member'], scope='project') return self.get_credentials(['member'], scope='project')

View File

@ -353,6 +353,13 @@ class PreProvisionedCredentialProvider(cred_provider.CredentialProvider):
self._creds['domain_admin'] = domain_admin self._creds['domain_admin'] = domain_admin
return domain_admin return domain_admin
def get_domain_manager_creds(self):
if self._creds.get('domain_manager'):
return self._creds.get('domain_manager')
domain_manager = self._get_creds(['manager'], scope='domain')
self._creds['domain_manager'] = domain_manager
return domain_manager
def get_domain_member_creds(self): def get_domain_member_creds(self):
if self._creds.get('domain_member'): if self._creds.get('domain_member'):
return self._creds.get('domain_member') return self._creds.get('domain_member')
@ -378,6 +385,13 @@ class PreProvisionedCredentialProvider(cred_provider.CredentialProvider):
# TODO(gmann): Implement alt admin hash. # TODO(gmann): Implement alt admin hash.
return return
def get_project_manager_creds(self):
if self._creds.get('project_manager'):
return self._creds.get('project_manager')
project_manager = self._get_creds(['manager'], scope='project')
self._creds['project_manager'] = project_manager
return project_manager
def get_project_member_creds(self): def get_project_member_creds(self):
if self._creds.get('project_member'): if self._creds.get('project_member'):
return self._creds.get('project_member') return self._creds.get('project_member')

View File

@ -104,6 +104,14 @@ class TestDynamicCredentialProvider(base.TestCase):
(200, {'tenant': {'id': id, 'name': name}})))) (200, {'tenant': {'id': id, 'name': name}}))))
return tenant_fix return tenant_fix
def _mock_domain_create(self, id, name):
domain_fix = self.useFixture(fixtures.MockPatchObject(
self.domains_client.DomainsClient,
'create_domain',
return_value=(rest_client.ResponseBody
(200, {'domain': {'id': id, 'name': name}}))))
return domain_fix
def _mock_list_roles(self, id, name): def _mock_list_roles(self, id, name):
roles_fix = self.useFixture(fixtures.MockPatchObject( roles_fix = self.useFixture(fixtures.MockPatchObject(
self.roles_client.RolesClient, self.roles_client.RolesClient,
@ -143,7 +151,8 @@ class TestDynamicCredentialProvider(base.TestCase):
{'id': '1', 'name': 'FakeRole'}, {'id': '1', 'name': 'FakeRole'},
{'id': '2', 'name': 'member'}, {'id': '2', 'name': 'member'},
{'id': '3', 'name': 'reader'}, {'id': '3', 'name': 'reader'},
{'id': '4', 'name': 'admin'}]})))) {'id': '4', 'name': 'manager'},
{'id': '5', 'name': 'admin'}]}))))
return roles_fix return roles_fix
def _mock_list_ec2_credentials(self, user_id, tenant_id): def _mock_list_ec2_credentials(self, user_id, tenant_id):
@ -999,6 +1008,7 @@ class TestDynamicCredentialProviderV3(TestDynamicCredentialProvider):
roles_client = v3_roles_client roles_client = v3_roles_client
tenants_client = v3_projects_client tenants_client = v3_projects_client
users_client = v3_users_client users_client = v3_users_client
domains_client = domains_client
token_client_class = token_client.V3TokenClient token_client_class = token_client.V3TokenClient
fake_response = fake_identity._fake_v3_response fake_response = fake_identity._fake_v3_response
tenants_client_class = tenants_client.ProjectsClient tenants_client_class = tenants_client.ProjectsClient
@ -1263,3 +1273,47 @@ class TestDynamicCredentialProviderV3(TestDynamicCredentialProvider):
"member role already exists, ignoring conflict.") "member role already exists, ignoring conflict.")
creds.creds_client.assign_user_role.assert_called_once_with( creds.creds_client.assign_user_role.assert_called_once_with(
mock.ANY, mock.ANY, 'member') mock.ANY, mock.ANY, 'member')
@mock.patch('tempest.lib.common.rest_client.RestClient')
def test_project_manager_creds(self, MockRestClient):
creds = dynamic_creds.DynamicCredentialProvider(**self.fixed_params)
self._mock_list_roles('1234', 'manager')
self._mock_user_create('1234', 'fake_manager_user')
self._mock_tenant_create('1234', 'fake_manager_tenant')
user_mock = mock.patch.object(self.roles_client.RolesClient,
'create_user_role_on_project')
user_mock.start()
self.addCleanup(user_mock.stop)
with mock.patch.object(self.roles_client.RolesClient,
'create_user_role_on_project') as user_mock:
manager_creds = creds.get_project_manager_creds()
user_mock.assert_has_calls([
mock.call('1234', '1234', '1234')])
self.assertEqual(manager_creds.username, 'fake_manager_user')
self.assertEqual(manager_creds.tenant_name, 'fake_manager_tenant')
# Verify IDs
self.assertEqual(manager_creds.tenant_id, '1234')
self.assertEqual(manager_creds.user_id, '1234')
@mock.patch('tempest.lib.common.rest_client.RestClient')
def test_domain_manager_creds(self, MockRestClient):
creds = dynamic_creds.DynamicCredentialProvider(**self.fixed_params)
self._mock_list_roles('1234', 'manager')
self._mock_user_create('1234', 'fake_manager_user')
self._mock_domain_create('1234', 'fake_manager_domain')
user_mock = mock.patch.object(self.roles_client.RolesClient,
'create_user_role_on_domain')
user_mock.start()
self.addCleanup(user_mock.stop)
with mock.patch.object(self.roles_client.RolesClient,
'create_user_role_on_domain') as user_mock:
manager_creds = creds.get_domain_manager_creds()
user_mock.assert_has_calls([
mock.call('1234', '1234', '1234')])
self.assertEqual(manager_creds.username, 'fake_manager_user')
self.assertEqual(manager_creds.domain_name, 'fake_manager_domain')
# Verify IDs
self.assertEqual(manager_creds.domain_id, '1234')
self.assertEqual(manager_creds.user_id, '1234')

View File

@ -77,7 +77,13 @@ class TestPreProvisionedCredentials(base.TestCase):
{'username': 'test_admin2', 'project_name': 'test_tenant12', {'username': 'test_admin2', 'project_name': 'test_tenant12',
'password': 'p', 'roles': [admin_role]}, 'password': 'p', 'roles': [admin_role]},
{'username': 'test_admin3', 'project_name': 'test_tenant13', {'username': 'test_admin3', 'project_name': 'test_tenant13',
'password': 'p', 'types': ['admin']}] 'password': 'p', 'types': ['admin']},
{'username': 'test_project_manager1',
'project_name': 'test_tenant14', 'password': 'p',
'roles': ['manager']},
{'username': 'test_project_manager2',
'tenant_name': 'test_tenant15', 'password': 'p',
'roles': ['manager']}]
def setUp(self): def setUp(self):
super(TestPreProvisionedCredentials, self).setUp() super(TestPreProvisionedCredentials, self).setUp()
@ -319,7 +325,7 @@ class TestPreProvisionedCredentials(base.TestCase):
calls = get_free_hash_mock.mock.mock_calls calls = get_free_hash_mock.mock.mock_calls
self.assertEqual(len(calls), 1) self.assertEqual(len(calls), 1)
args = calls[0][1][0] args = calls[0][1][0]
self.assertEqual(len(args), 10) self.assertEqual(len(args), 12)
for i in admin_hashes: for i in admin_hashes:
self.assertNotIn(i, args) self.assertNotIn(i, args)
@ -431,6 +437,26 @@ class TestPreProvisionedCredentials(base.TestCase):
# Get one more # Get one more
test_accounts_class.get_admin_creds() test_accounts_class.get_admin_creds()
def test_get_project_manager_creds(self):
test_accounts_class = preprov_creds.PreProvisionedCredentialProvider(
**self.fixed_params)
p_manager_creds = test_accounts_class.get_project_manager_creds()
self.assertNotIn('test_admin', p_manager_creds.username)
self.assertNotIn('test_user', p_manager_creds.username)
self.assertIn('test_project_manager', p_manager_creds.username)
def test_get_project_manager_creds_none_available(self):
admin_accounts = [x for x in self.test_accounts if 'test_admin'
in x['username']]
self.useFixture(fixtures.MockPatch(
'tempest.lib.common.preprov_creds.read_accounts_yaml',
return_value=admin_accounts))
test_accounts_class = preprov_creds.PreProvisionedCredentialProvider(
**self.fixed_params)
with testtools.ExpectedException(lib_exc.InvalidCredentials):
# Get one more
test_accounts_class.get_project_manager_creds()
class TestPreProvisionedCredentialsV3(TestPreProvisionedCredentials): class TestPreProvisionedCredentialsV3(TestPreProvisionedCredentials):
@ -480,4 +506,29 @@ class TestPreProvisionedCredentialsV3(TestPreProvisionedCredentials):
{'username': 'test_admin2', 'project_name': 'test_project12', {'username': 'test_admin2', 'project_name': 'test_project12',
'domain_name': 'domain', 'password': 'p', 'roles': [admin_role]}, 'domain_name': 'domain', 'password': 'p', 'roles': [admin_role]},
{'username': 'test_admin3', 'project_name': 'test_tenant13', {'username': 'test_admin3', 'project_name': 'test_tenant13',
'domain_name': 'domain', 'password': 'p', 'types': ['admin']}] 'domain_name': 'domain', 'password': 'p', 'types': ['admin']},
{'username': 'test_project_manager1',
'project_name': 'test_project14', 'domain_name': 'domain',
'password': 'p', 'roles': ['manager']},
{'username': 'test_domain_manager1',
'domain_name': 'domain', 'password': 'p', 'roles': ['manager']}]
def test_get_domain_manager_creds(self):
test_accounts_class = preprov_creds.PreProvisionedCredentialProvider(
**self.fixed_params)
d_manager_creds = test_accounts_class.get_domain_manager_creds()
self.assertNotIn('test_admin', d_manager_creds.username)
self.assertNotIn('test_user', d_manager_creds.username)
self.assertIn('test_domain_manager', d_manager_creds.username)
def test_get_domain_manager_creds_none_available(self):
admin_accounts = [x for x in self.test_accounts if 'test_admin'
in x['username']]
self.useFixture(fixtures.MockPatch(
'tempest.lib.common.preprov_creds.read_accounts_yaml',
return_value=admin_accounts))
test_accounts_class = preprov_creds.PreProvisionedCredentialProvider(
**self.fixed_params)
with testtools.ExpectedException(lib_exc.InvalidCredentials):
# Get one more
test_accounts_class.get_domain_manager_creds()