Add secure-rbac test for Containers

This patch adds rbac tests to the Containers resource to test
secure-rbac policies within a project

This patch also removes the use of do_request in the existing tests
as that method is being deprecated in favor of using the clients
directly.

* PEP8 Fixes
* Fix the plurality of the method names
* Remove _by_id from certain methods to maintain consistency

Change-Id: I80aba2934110965866d1583309df7f2ca9ef4c27
This commit is contained in:
Dave Wilde (d34dh0r53) 2021-08-24 13:35:52 +00:00 committed by Ade Lee
parent 087f89dd16
commit 4e5e45748d
2 changed files with 226 additions and 147 deletions

View File

@ -144,7 +144,7 @@ class BarbicanV1RbacBase(test.BaseTestCase):
)
# setup clients for admin persona
# this client is used for any cleanupi/setup etc. as needed
# this client is used for any cleanup/setup etc. as needed
adm = cls.os_project_admin
cls.admin_secret_client = adm.secret_v1.SecretClient()
cls.admin_secret_metadata_client = adm.secret_v1.SecretMetadataClient(
@ -230,6 +230,14 @@ class BarbicanV1RbacBase(test.BaseTestCase):
"""add empty secret as admin user """
return self.admin_secret_client.create_secret(name=secret_name)
def create_empty_container_admin(self,
container_name,
container_type='generic'):
"""add empty container as admin user"""
return self.admin_container_client.create_container(
name=container_name,
type=container_type)
def create_aes_secret_admin(self, secret_name):
key = create_aes_key()
expire_time = (datetime.utcnow() + timedelta(days=5))

View File

@ -37,7 +37,7 @@ class BarbicanV1RbacContainers:
"""Test create_container policy
Testing: POST /v1/containers
Thist test must check:
This test must check:
* whether the persona can create a new container
"""
raise NotImplementedError
@ -47,7 +47,7 @@ class BarbicanV1RbacContainers:
"""Test get_container policy
Testing: GET /v1/containers/{container-id}
Thist test must check:
This test must check:
* whether the persona can get a container
"""
raise NotImplementedError
@ -57,17 +57,97 @@ class BarbicanV1RbacContainers:
"""Test delete_container policy
Testing: DELETE /v1/containers/{container-id}
Thist test must check:
This test must check:
* whether the persona can delete a container
"""
raise NotImplementedError
@abc.abstractmethod
def test_get_container_acl(self):
"""Test get_container_acl policy
Testing: GET /v1/containers/{container-id}/acl
This test must check:
* whether the persona can get a containers acl
"""
raise NotImplementedError
@abc.abstractmethod
def test_update_container_acl(self):
"""Test update_container_acl policy
Testing: PATCH /v1/containers/{container-id}/acl
This test must check:
* whether the persona can update an existing containers acl
"""
raise NotImplementedError
@abc.abstractmethod
def test_create_container_acl(self):
"""Test create_container_acl policy
Testing: PUT /v1/containers/{container-id}/acl
This test must check:
* whether the persona can create a containers acl
"""
raise NotImplementedError
@abc.abstractmethod
def test_delete_container_acl(self):
"""Test delete_container_acl policy
Testing: DELETE /v1/containers/{container-id}
This test must check:
* whether the persona can delete a containers acl
"""
raise NotImplementedError
@abc.abstractmethod
def test_list_container_consumers(self):
"""Test list_container_consumers policy
Testing: GET /v1/containers/{container-id}/consumers
This test must check:
* whether the persona can list a containers consumers
"""
raise NotImplementedError
@abc.abstractmethod
def test_create_container_consumer(self):
"""Test create_container_consumer policy
Testing: POST /v1/containers/{container-id}/consumers
This test must check:
* whether the persona can create a consumer of the container
"""
raise NotImplementedError
@abc.abstractmethod
def test_get_container_consumer(self):
"""Test get_container_consumer policy
Testing: GET /v1/containers/{container-id}/consumers/{consumer-id}
This test must check:
* whether the persona can get a containers consumer by id
"""
raise NotImplementedError
@abc.abstractmethod
def test_delete_container_consumer(self):
"""Test delete_container_consumer policy
Testing: DELETE /v1/containers/{container-id}/consumers/{consumer-id}
This test must check:
* whether the persona can delete a containers consumer by id
"""
raise NotImplementedError
@abc.abstractmethod
def test_add_secret_to_container(self):
"""Test add_secret_to_container policy
Testing: POST /v1/containers/{container-id}/secrets
Thist test must check:
This test must check:
* whether the persona can add a secret to a container
"""
raise NotImplementedError
@ -77,93 +157,183 @@ class BarbicanV1RbacContainers:
"""Test delete_secret_from_container policy
Testing: DELETE /v1/containers/{container-id}/secrets
Thist test must check:
This test must check:
* whether the persona can delete a secret from a container
"""
raise NotImplementedError
class ProjectMemberTests(base.BarbicanV1RbacBase, BarbicanV1RbacContainers):
class ProjectReaderTests(base.BarbicanV1RbacBase, BarbicanV1RbacContainers):
@classmethod
def setup_clients(cls):
super().setup_clients()
cls.client = cls.os_project_reader.secret_v1.ContainerClient()
cls.secret_client = cls.os_project_reader.secret_v1.SecretClient()
cls.consumer_client = cls.os_project_reader.secret_v1.ConsumerClient(
service='key-manager')
def test_list_containers(self):
self.assertRaises(
exceptions.Forbidden,
self.client.list_containers)
def test_create_container(self):
self.assertRaises(
exceptions.Forbidden,
self.client.create_container)
def test_get_container(self):
resp = self.create_empty_container_admin('test_reader_get_container')
container_id = self.ref_to_uuid(resp['container_ref'])
self.assertRaises(
exceptions.Forbidden,
self.client.get_container,
container_id=container_id)
def test_delete_container(self):
resp = self.create_empty_container_admin(
'test_reader_delete_container')
container_id = self.ref_to_uuid(resp['container_ref'])
self.assertRaises(
exceptions.Forbidden,
self.client.delete_container,
container_id=container_id)
def test_get_container_acl(self):
pass
def test_update_container_acl(self):
pass
def test_create_container_acl(self):
pass
def test_delete_container_acl(self):
pass
def test_list_container_consumers(self):
pass
def test_create_container_consumer(self):
pass
def test_get_container_consumer(self):
pass
def test_delete_container_consumer(self):
pass
def test_add_secret_to_container(self):
resp = self.create_empty_container_admin(
'test_reader_add_secret_to_container_container')
container_id = self.ref_to_uuid(resp['container_ref'])
resp = self.create_empty_secret_admin(
'test_reader_add_secret_to_container_secret')
secret_id = self.ref_to_uuid(resp['secret_ref'])
self.assertRaises(
exceptions.Forbidden,
self.client.add_secret_to_container,
container_id=container_id,
secret_id=secret_id)
def test_delete_secret_from_container(self):
resp = self.create_empty_container_admin(
'test_reader_delete_secret_from_container_container')
container_id = self.ref_to_uuid(resp['container_ref'])
resp = self.create_empty_secret_admin(
'test_reader_delete_secret_from_container_secret')
secret_id = self.ref_to_uuid(resp['secret_ref'])
self.assertRaises(
exceptions.Forbidden,
self.client.delete_secret_from_container,
container_id=container_id,
secret_id=secret_id)
class ProjectMemberTests(ProjectReaderTests):
@classmethod
def setup_clients(cls):
super().setup_clients()
cls.client = cls.os_project_member.secret_v1.ContainerClient()
cls.secret_client = cls.os_project_member.secret_v1.SecretClient()
cls.consumer_client = cls.os_project_member.secret_v1.ConsumerClient()
def test_list_containers(self):
self.do_request('create_container', cleanup='container',
name='list_containers', type='generic')
resp = self.do_request('list_containers')
self.client.create_container(
name='test_list_containers',
type='generic')
resp = self.client.list_containers(name='test_list_containers')
containers = resp['containers']
self.assertGreaterEqual(len(containers), 1)
def test_create_container(self):
self.do_request('create_container', cleanup='container',
name='create_container', type='generic')
self.client.create_container(
name='test_create_containers',
type='generic')
def test_get_container(self):
resp = self.do_request('create_container', cleanup='container',
name='get_container', type='generic')
resp = self.client.create_container(
name='get_container',
type='generic')
container_id = self.ref_to_uuid(resp['container_ref'])
resp = self.do_request('get_container', container_id=container_id)
resp = self.client.get_container(container_id=container_id)
self.assertEqual(container_id, self.ref_to_uuid(resp['container_ref']))
def test_delete_container(self):
resp = self.do_request('create_container', name='delete_container',
type='generic')
resp = self.client.create_container(
name='delete_container',
type='generic')
container_id = self.ref_to_uuid(resp['container_ref'])
resp = self.do_request('delete_container', container_id=container_id)
self.client.delete_container(container_id)
def test_add_secret_to_container(self):
resp = self.do_request('create_container', cleanup='container',
name='add_secret_to_container_c',
type='generic')
resp = self.client.create_container(
name='add_secret_to_container_c',
type='generic')
container_id = self.ref_to_uuid(resp['container_ref'])
resp = self.do_request(
'create_secret',
client=self.secret_client,
resp = self.secret_client.create_secret(
cleanup='secret',
name='add_secret_to_container_s',
secret_type='passphrase',
payload='shhh... secret',
payload_content_type='text/plain'
)
secret_id = self.ref_to_uuid(resp['secret_ref'])
payload_content_type='text/plain')
resp = self.do_request('add_secret_to_container',
container_id=container_id,
secret_id=secret_id)
secret_id = self.ref_to_uuid(resp['secret_ref'])
self.client.add_secret_to_container(
container_id=container_id,
secret_id=secret_id)
def test_delete_secret_from_container(self):
resp = self.do_request('create_container', cleanup='container',
name='delete_secret_from_container_c',
type='generic')
resp = self.client.create_container(
name='add_secret_to_container_c',
type='generic')
container_id = self.ref_to_uuid(resp['container_ref'])
resp = self.do_request(
'create_secret',
client=self.secret_client,
resp = self.secret_client.create_secret(
cleanup='secret',
name='delete_secret_from_container_s',
name='add_secret_to_container_s',
secret_type='passphrase',
payload='shhh... secret',
payload_content_type='text/plain'
)
payload_content_type='text/plain')
secret_id = self.ref_to_uuid(resp['secret_ref'])
self.do_request('add_secret_to_container',
container_id=container_id,
secret_id=secret_id)
resp = self.do_request('delete_secret_from_container',
container_id=container_id,
secret_id=secret_id)
self.client.add_secret_to_container(
container_id=container_id,
secret_id=secret_id)
self.client.delete_secret_from_container(
container_id=container_id,
secret_id=secret_id)
class ProjectAdminTests(ProjectMemberTests):
@ -173,103 +343,4 @@ class ProjectAdminTests(ProjectMemberTests):
super().setup_clients()
cls.client = cls.os_project_admin.secret_v1.ContainerClient()
cls.secret_client = cls.os_project_admin.secret_v1.SecretClient()
class ProjectReaderTests(base.BarbicanV1RbacBase, BarbicanV1RbacContainers):
@classmethod
def setup_clients(cls):
super().setup_clients()
cls.client = cls.os_project_reader.secret_v1.ContainerClient()
def test_list_containers(self):
self.do_request('list_containers',
expected_status=exceptions.Forbidden)
def test_create_container(self):
self.do_request('create_container',
expected_status=exceptions.Forbidden,
name='create_container',
type='generic')
def test_get_container(self):
resp = self.do_request(
'create_container',
client=self.os_project_member.secret_v1.ContainerClient(),
cleanup='container',
name='create_container', type='generic'
)
container_id = self.ref_to_uuid(resp['container_ref'])
self.do_request('get_container', expected_status=exceptions.Forbidden,
container_id=container_id)
def test_delete_container(self):
resp = self.do_request(
'create_container',
client=self.os_project_member.secret_v1.ContainerClient(),
cleanup='container',
name='delete_container', type='generic'
)
container_id = self.ref_to_uuid(resp['container_ref'])
self.do_request('delete_container',
expected_status=exceptions.Forbidden,
container_id=container_id)
def test_add_secret_to_container(self):
resp = self.do_request(
'create_container',
client=self.os_project_member.secret_v1.ContainerClient(),
cleanup='container',
name='add_secret_to_container_c', type='generic'
)
container_id = self.ref_to_uuid(resp['container_ref'])
resp = self.do_request(
'create_secret',
client=self.os_project_member.secret_v1.SecretClient(),
cleanup='secret',
name='add_secret_to_container_s',
secret_type='passphrase',
payload='shhh... secret',
payload_content_type='text/plain'
)
secret_id = self.ref_to_uuid(resp['secret_ref'])
self.do_request('add_secret_to_container',
expected_status=exceptions.Forbidden,
container_id=container_id,
secret_id=secret_id)
def test_delete_secret_from_container(self):
resp = self.do_request(
'create_container',
client=self.os_project_member.secret_v1.ContainerClient(),
cleanup='container',
name='delete_secret_from_container_c', type='generic'
)
container_id = self.ref_to_uuid(resp['container_ref'])
resp = self.do_request(
'create_secret',
client=self.os_project_member.secret_v1.SecretClient(),
cleanup='secret',
name='delete_secret_from_container_s',
secret_type='passphrase',
payload='shhh... secret',
payload_content_type='text/plain'
)
secret_id = self.ref_to_uuid(resp['secret_ref'])
self.do_request(
'add_secret_to_container',
client=self.os_project_member.secret_v1.ContainerClient(),
container_id=container_id,
secret_id=secret_id
)
self.do_request('delete_secret_from_container',
expected_status=exceptions.Forbidden,
container_id=container_id,
secret_id=secret_id)
cls.consumer_client = cls.os_project_member.secret_v1.ConsumerClient()