Add secure-rbac tests for Containers

This patch adds basic RBAC testing for the Containers resource for
reader, member and admin personas with project scope.

Depends-On: I6879f566117db5ec0099ddad35ba649a3c674bd1
Change-Id: I2272f5cd2385df15cc7761e8b858dfd39be955d4
This commit is contained in:
Douglas Mendizábal 2021-03-31 14:55:54 -05:00
parent d013596f3d
commit db9b512ead
5 changed files with 328 additions and 36 deletions

View File

@ -0,0 +1,22 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from tempest.lib.common import rest_client
_DEFAULT_SERVICE_TYPE = 'key-manager'
class BarbicanTempestClient(rest_client.RestClient):
def __init__(self, *args, **kwargs):
kwargs['service'] = _DEFAULT_SERVICE_TYPE
super().__init__(*args, **kwargs)

View File

@ -11,19 +11,17 @@
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import json
from urllib import parse as urllib
from tempest import config
from tempest.lib.common import rest_client
from barbican_tempest_plugin.services.key_manager.json import base
CONF = config.CONF
class ContainerClient(rest_client.RestClient):
class ContainerClient(base.BarbicanTempestClient):
def list_containers(self, **kwargs):
uri = "v1/containers"

View File

@ -30,10 +30,6 @@ CONF = config.CONF
RESOURCE_TYPES = ['container', 'order', 'quota', 'secret']
def _get_uuid(href):
return href.split('/')[-1]
def create_aes_key():
password = b"password"
salt = os.urandom(16)
@ -50,8 +46,13 @@ class BarbicanV1RbacBase(test.BaseTestCase):
_created_projects = None
_created_users = None
created_objects = {}
credentials = ['system_admin']
@classmethod
def ref_to_uuid(cls, href):
return href.split('/')[-1]
@classmethod
def skip_checks(cls):
super().skip_checks()
@ -129,9 +130,7 @@ class BarbicanV1RbacBase(test.BaseTestCase):
cls.consumer_client = os.secret_v1.ConsumerClient(
service='key-manager'
)
cls.container_client = os.secret_v1.ContainerClient(
service='key-manager'
)
cls.container_client = os.secret_v1.ContainerClient()
cls.order_client = os.secret_v1.OrderClient(service='key-manager')
cls.quota_client = os.secret_v1.QuotaClient(service='key-manager')
cls.secret_client = os.secret_v1.SecretClient()
@ -149,9 +148,7 @@ class BarbicanV1RbacBase(test.BaseTestCase):
cls.admin_consumer_client = adm.secret_v1.ConsumerClient(
service='key-manager'
)
cls.admin_container_client = adm.secret_v1.ContainerClient(
service='key-manager'
)
cls.admin_container_client = adm.secret_v1.ContainerClient()
cls.admin_order_client = adm.secret_v1.OrderClient(
service='key-manager'
)
@ -192,18 +189,18 @@ class BarbicanV1RbacBase(test.BaseTestCase):
@classmethod
def add_cleanup(cls, resource, response):
if resource == 'container':
uuid = _get_uuid(response['container_ref'])
uuid = cls.ref_to_uuid(response['container_ref'])
if resource == 'order':
uuid = _get_uuid(response.get('order_ref'))
uuid = cls.ref_to_uuid(response.get('order_ref'))
order_metadata = cls.get_order(uuid)
secret_ref = order_metadata.get('secret_ref')
if secret_ref:
cls.created_objects['secret'].add(_get_uuid(secret_ref))
uuid = _get_uuid(response['order_ref'])
cls.created_objects['secret'].add(cls.ref_to_uuid(secret_ref))
uuid = cls.ref_to_uuid(response['order_ref'])
if resource == 'quota':
uuid = _get_uuid(response['quota_ref'])
uuid = cls.ref_to_uuid(response['quota_ref'])
if resource == 'secret':
uuid = _get_uuid(response['secret_ref'])
uuid = cls.ref_to_uuid(response['secret_ref'])
cls.created_objects[resource].add(uuid)
@classmethod

View File

@ -0,0 +1,275 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import abc
from tempest import config
from tempest.lib import exceptions
from barbican_tempest_plugin.tests.rbac.v1 import base
CONF = config.CONF
class BarbicanV1RbacContainers:
@abc.abstractmethod
def test_list_containers(self):
"""Test list_containers policy
Testing: GET /v1/containers
This test must check:
* whether the persona can list containers
"""
raise NotImplementedError
@abc.abstractmethod
def test_create_container(self):
"""Test create_container policy
Testing: POST /v1/containers
Thist test must check:
* whether the persona can create a new container
"""
raise NotImplementedError
@abc.abstractmethod
def test_get_container(self):
"""Test get_container policy
Testing: GET /v1/containers/{container-id}
Thist test must check:
* whether the persona can get a container
"""
raise NotImplementedError
@abc.abstractmethod
def test_delete_container(self):
"""Test delete_container policy
Testing: DELETE /v1/containers/{container-id}
Thist test must check:
* whether the persona can delete a container
"""
raise NotImplementedError
@abc.abstractmethod
def test_add_secret_to_container(self):
"""Test add_secret_to_container policy
Testing: POST /v1/containers/{container-id}/secrets
Thist test must check:
* whether the persona can add a secret to a container
"""
raise NotImplementedError
@abc.abstractmethod
def test_delete_secret_from_container(self):
"""Test delete_secret_from_container policy
Testing: DELETE /v1/containers/{container-id}/secrets
Thist test must check:
* whether the persona can delete a secret from a container
"""
raise NotImplementedError
class ProjectMemberTests(base.BarbicanV1RbacBase, BarbicanV1RbacContainers):
@classmethod
def setup_clients(cls):
super().setup_clients()
cls.client = cls.os_project_member.secret_v1.ContainerClient()
cls.secret_client = cls.os_project_member.secret_v1.SecretClient()
def test_list_containers(self):
self.do_request('create_container', cleanup='container',
name='list_containers', type='generic')
resp = self.do_request('list_containers')
containers = resp['containers']
self.assertGreaterEqual(len(containers), 1)
def test_create_container(self):
self.do_request('create_container', cleanup='container',
name='create_container', type='generic')
def test_get_container(self):
resp = self.do_request('create_container', cleanup='container',
name='get_container', type='generic')
container_id = self.ref_to_uuid(resp['container_ref'])
resp = self.do_request('get_container', container_id=container_id)
self.assertEqual(container_id, self.ref_to_uuid(resp['container_ref']))
def test_delete_container(self):
resp = self.do_request('create_container', name='delete_container',
type='generic')
container_id = self.ref_to_uuid(resp['container_ref'])
resp = self.do_request('delete_container', container_id=container_id)
def test_add_secret_to_container(self):
resp = self.do_request('create_container', cleanup='container',
name='add_secret_to_container_c',
type='generic')
container_id = self.ref_to_uuid(resp['container_ref'])
resp = self.do_request(
'create_secret',
client=self.secret_client,
cleanup='secret',
name='add_secret_to_container_s',
secret_type='passphrase',
payload='shhh... secret',
payload_content_type='text/plain'
)
secret_id = self.ref_to_uuid(resp['secret_ref'])
resp = self.do_request('add_secret_to_container',
container_id=container_id,
secret_id=secret_id)
def test_delete_secret_from_container(self):
resp = self.do_request('create_container', cleanup='container',
name='delete_secret_from_container_c',
type='generic')
container_id = self.ref_to_uuid(resp['container_ref'])
resp = self.do_request(
'create_secret',
client=self.secret_client,
cleanup='secret',
name='delete_secret_from_container_s',
secret_type='passphrase',
payload='shhh... secret',
payload_content_type='text/plain'
)
secret_id = self.ref_to_uuid(resp['secret_ref'])
self.do_request('add_secret_to_container',
container_id=container_id,
secret_id=secret_id)
resp = self.do_request('delete_secret_from_container',
container_id=container_id,
secret_id=secret_id)
class ProjectAdminTests(ProjectMemberTests):
@classmethod
def setup_clients(cls):
super().setup_clients()
cls.client = cls.os_project_admin.secret_v1.ContainerClient()
cls.secret_client = cls.os_project_admin.secret_v1.SecretClient()
class ProjectReaderTests(base.BarbicanV1RbacBase, BarbicanV1RbacContainers):
@classmethod
def setup_clients(cls):
super().setup_clients()
cls.client = cls.os_project_reader.secret_v1.ContainerClient()
def test_list_containers(self):
self.do_request('list_containers',
expected_status=exceptions.Forbidden)
def test_create_container(self):
self.do_request('create_container',
expected_status=exceptions.Forbidden,
name='create_container',
type='generic')
def test_get_container(self):
resp = self.do_request(
'create_container',
client=self.os_project_member.secret_v1.ContainerClient(),
cleanup='container',
name='create_container', type='generic'
)
container_id = self.ref_to_uuid(resp['container_ref'])
self.do_request('get_container', expected_status=exceptions.Forbidden,
container_id=container_id)
def test_delete_container(self):
resp = self.do_request(
'create_container',
client=self.os_project_member.secret_v1.ContainerClient(),
cleanup='container',
name='delete_container', type='generic'
)
container_id = self.ref_to_uuid(resp['container_ref'])
self.do_request('delete_container',
expected_status=exceptions.Forbidden,
container_id=container_id)
def test_add_secret_to_container(self):
resp = self.do_request(
'create_container',
client=self.os_project_member.secret_v1.ContainerClient(),
cleanup='container',
name='add_secret_to_container_c', type='generic'
)
container_id = self.ref_to_uuid(resp['container_ref'])
resp = self.do_request(
'create_secret',
client=self.os_project_member.secret_v1.SecretClient(),
cleanup='secret',
name='add_secret_to_container_s',
secret_type='passphrase',
payload='shhh... secret',
payload_content_type='text/plain'
)
secret_id = self.ref_to_uuid(resp['secret_ref'])
self.do_request('add_secret_to_container',
expected_status=exceptions.Forbidden,
container_id=container_id,
secret_id=secret_id)
def test_delete_secret_from_container(self):
resp = self.do_request(
'create_container',
client=self.os_project_member.secret_v1.ContainerClient(),
cleanup='container',
name='delete_secret_from_container_c', type='generic'
)
container_id = self.ref_to_uuid(resp['container_ref'])
resp = self.do_request(
'create_secret',
client=self.os_project_member.secret_v1.SecretClient(),
cleanup='secret',
name='delete_secret_from_container_s',
secret_type='passphrase',
payload='shhh... secret',
payload_content_type='text/plain'
)
secret_id = self.ref_to_uuid(resp['secret_ref'])
self.do_request(
'add_secret_to_container',
client=self.os_project_member.secret_v1.ContainerClient(),
container_id=container_id,
secret_id=secret_id
)
self.do_request('delete_secret_from_container',
expected_status=exceptions.Forbidden,
container_id=container_id,
secret_id=secret_id)

View File

@ -34,7 +34,7 @@ class BarbicanV1RbacSecrets(metaclass=abc.ABCMeta):
* whether the persona can create an empty secret
* whether the persona can create a secret with a symmetric key
"""
pass
raise NotImplementedError
@abc.abstractmethod
def test_list_secrets(self):
@ -44,7 +44,7 @@ class BarbicanV1RbacSecrets(metaclass=abc.ABCMeta):
This test must check:
* whether the persona can list secrets within their project
"""
pass
raise NotImplementedError
@abc.abstractmethod
def test_delete_secret(self):
@ -54,7 +54,7 @@ class BarbicanV1RbacSecrets(metaclass=abc.ABCMeta):
This test must check:
* whether the persona can delete a secret in their project
"""
pass
raise NotImplementedError
@abc.abstractmethod
def test_get_secret(self):
@ -64,7 +64,7 @@ class BarbicanV1RbacSecrets(metaclass=abc.ABCMeta):
This test must check:
* whether the persona can get a specific secret within their project
"""
pass
raise NotImplementedError
@abc.abstractmethod
def test_get_secret_payload(self):
@ -74,7 +74,7 @@ class BarbicanV1RbacSecrets(metaclass=abc.ABCMeta):
This test must check:
* whether the persona can get a secret payload
"""
pass
raise NotImplementedError
@abc.abstractmethod
def test_put_secret_payload(self):
@ -84,7 +84,7 @@ class BarbicanV1RbacSecrets(metaclass=abc.ABCMeta):
This test must check:
* whether the persona can add a paylod to an empty secret
"""
pass
raise NotImplementedError
class ProjectMemberTests(rbac_base.BarbicanV1RbacBase, BarbicanV1RbacSecrets):
@ -134,21 +134,21 @@ class ProjectMemberTests(rbac_base.BarbicanV1RbacBase, BarbicanV1RbacSecrets):
def test_delete_secret(self):
"""Test delete_secrets policy."""
sec = self.create_empty_secret_admin('test_delete_secret_1')
uuid = rbac_base._get_uuid(sec['secret_ref'])
uuid = self.ref_to_uuid(sec['secret_ref'])
self.do_request('delete_secret', secret_id=uuid)
self.delete_cleanup('secret', uuid)
def test_get_secret(self):
"""Test get_secret policy."""
sec = self.create_empty_secret_admin('test_get_secret')
uuid = rbac_base._get_uuid(sec['secret_ref'])
uuid = self.ref_to_uuid(sec['secret_ref'])
resp = self.do_request('get_secret_metadata', secret_id=uuid)
self.assertEqual(uuid, rbac_base._get_uuid(resp['secret_ref']))
self.assertEqual(uuid, self.ref_to_uuid(resp['secret_ref']))
def test_get_secret_payload(self):
"""Test get_secret payload policy."""
key, sec = self.create_aes_secret_admin('test_get_secret_payload')
uuid = rbac_base._get_uuid(sec['secret_ref'])
uuid = self.ref_to_uuid(sec['secret_ref'])
# Retrieve the payload
payload = self.do_request('get_secret_payload', secret_id=uuid)
@ -157,7 +157,7 @@ class ProjectMemberTests(rbac_base.BarbicanV1RbacBase, BarbicanV1RbacSecrets):
def test_put_secret_payload(self):
"""Test put_secret policy."""
sec = self.create_empty_secret_admin('test_put_secret_payload')
uuid = rbac_base._get_uuid(sec['secret_ref'])
uuid = self.ref_to_uuid(sec['secret_ref'])
key = rbac_base.create_aes_key()
@ -226,7 +226,7 @@ class ProjectReaderTests(rbac_base.BarbicanV1RbacBase, BarbicanV1RbacSecrets):
def test_delete_secret(self):
"""Test delete_secrets policy."""
sec = self.create_empty_secret_admin('secret_1')
uuid = rbac_base._get_uuid(sec['secret_ref'])
uuid = self.ref_to_uuid(sec['secret_ref'])
self.do_request(
'delete_secret', expected_status=exceptions.Forbidden,
secret_id=uuid
@ -235,7 +235,7 @@ class ProjectReaderTests(rbac_base.BarbicanV1RbacBase, BarbicanV1RbacSecrets):
def test_get_secret(self):
"""Test get_secret policy."""
sec = self.create_empty_secret_admin('secret_1')
uuid = rbac_base._get_uuid(sec['secret_ref'])
uuid = self.ref_to_uuid(sec['secret_ref'])
self.do_request(
'get_secret_metadata', expected_status=exceptions.Forbidden,
secret_id=uuid
@ -244,7 +244,7 @@ class ProjectReaderTests(rbac_base.BarbicanV1RbacBase, BarbicanV1RbacSecrets):
def test_get_secret_payload(self):
"""Test get_secret payload policy."""
key, sec = self.create_aes_secret_admin('secret_1')
uuid = rbac_base._get_uuid(sec['secret_ref'])
uuid = self.ref_to_uuid(sec['secret_ref'])
# Retrieve the payload
self.do_request(
@ -255,7 +255,7 @@ class ProjectReaderTests(rbac_base.BarbicanV1RbacBase, BarbicanV1RbacSecrets):
def test_put_secret_payload(self):
"""Test put_secret policy."""
sec = self.create_empty_secret_admin('secret_1')
uuid = rbac_base._get_uuid(sec['secret_ref'])
uuid = self.ref_to_uuid(sec['secret_ref'])
key = rbac_base.create_aes_key()