diff --git a/barbican_tempest_plugin/config.py b/barbican_tempest_plugin/config.py index 0c4a2ac..67a7986 100644 --- a/barbican_tempest_plugin/config.py +++ b/barbican_tempest_plugin/config.py @@ -54,3 +54,14 @@ ImageSignatureVerificationGroup = [ help="Does the test environment enforce glance image " "verification?"), ] + +barbican_rbac_scope_verification_group = cfg.OptGroup( + name="barbican_rbac_scope_verification", + title="Barbican RBAC Verification Options") + +BarbicanRBACScopeVerificationGroup = [ + cfg.BoolOpt('enforce_scope', + default=False, + help="Does barbican enforce scope and user " + "scope-aware policies?"), +] diff --git a/barbican_tempest_plugin/plugin.py b/barbican_tempest_plugin/plugin.py index 1914ecb..2acd7cc 100644 --- a/barbican_tempest_plugin/plugin.py +++ b/barbican_tempest_plugin/plugin.py @@ -39,6 +39,12 @@ class BarbicanTempestPlugin(plugins.TempestPlugin): project_config.ephemeral_storage_encryption_group) conf.register_opts(project_config.ImageSignatureVerificationGroup, project_config.image_signature_verification_group) + conf.register_group( + project_config.barbican_rbac_scope_verification_group) + conf.register_opts( + project_config.BarbicanRBACScopeVerificationGroup, + project_config.barbican_rbac_scope_verification_group + ) def get_opt_lists(self): return [('service_available', [project_config.service_option])] diff --git a/barbican_tempest_plugin/tests/api/base.py b/barbican_tempest_plugin/tests/api/base.py index 7256a10..2599480 100644 --- a/barbican_tempest_plugin/tests/api/base.py +++ b/barbican_tempest_plugin/tests/api/base.py @@ -83,6 +83,11 @@ class BaseKeyManagerTest(test.BaseTestCase): os = getattr(cls, 'os_roles_%s' % cls.credentials[1][0]) cls.quota_client = os.secret_v1.QuotaClient(service='key-manager') + @classmethod + def setup_credentials(cls): + super().setup_credentials() + cls.os_primary = getattr(cls, f'os_{cls.credentials[0]}') + @classmethod def resource_setup(cls): super(BaseKeyManagerTest, cls).resource_setup() diff --git a/barbican_tempest_plugin/tests/rbac/__init__.py b/barbican_tempest_plugin/tests/rbac/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/barbican_tempest_plugin/tests/rbac/v1/__init__.py b/barbican_tempest_plugin/tests/rbac/v1/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/barbican_tempest_plugin/tests/rbac/v1/base.py b/barbican_tempest_plugin/tests/rbac/v1/base.py new file mode 100644 index 0000000..5cddb7e --- /dev/null +++ b/barbican_tempest_plugin/tests/rbac/v1/base.py @@ -0,0 +1,149 @@ +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +from tempest import config + +CONF = config.CONF + +RESOURCE_TYPES = ['container', 'order', 'quota', 'secret'] + + +def _get_uuid(href): + return href.split('/')[-1] + + +class BarbicanV1RbacBase(object): + + identity_version = 'v3' + created_objects = {} + + @classmethod + def skip_checks(cls): + super().skip_checks() + if not CONF.barbican_rbac_scope_verification.enforce_scope: + raise cls.skipException("enforce_scope is not enabled for " + "barbican, skipping RBAC tests") + + @classmethod + def setup_clients(cls): + super().setup_clients() + + # setup clients for primary persona + os = getattr(cls, f'os_{cls.credentials[0]}') + cls.secret_client = os.secret_v1.SecretClient(service='key-manager') + cls.secret_metadata_client = os.secret_v1.SecretMetadataClient( + service='key-manager' + ) + cls.consumer_client = os.secret_v1.ConsumerClient( + service='key-manager' + ) + cls.container_client = os.secret_v1.ContainerClient( + service='key-manager' + ) + cls.order_client = os.secret_v1.OrderClient(service='key-manager') + cls.quota_client = os.secret_v1.QuotaClient(service='key-manager') + cls.secret_client = os.secret_v1.SecretClient(service='key-manager') + cls.secret_metadata_client = os.secret_v1.SecretMetadataClient( + service='key-manager' + ) + + # setup clients for admin persona + # this client is used for any cleanupi/setup etc. as needed + adm = getattr(cls, f'os_{cls.credentials[1]}') + cls.admin_secret_client = adm.secret_v1.SecretClient( + service='key-manager') + cls.admin_secret_metadata_client = adm.secret_v1.SecretMetadataClient( + service='key-manager' + ) + cls.admin_consumer_client = adm.secret_v1.ConsumerClient( + service='key-manager' + ) + cls.admin_container_client = adm.secret_v1.ContainerClient( + service='key-manager' + ) + cls.admin_order_client = adm.secret_v1.OrderClient( + service='key-manager' + ) + cls.admin_quota_client = adm.secret_v1.QuotaClient( + service='key-manager' + ) + cls.admin_secret_client = adm.secret_v1.SecretClient( + service='key-manager' + ) + cls.admin_secret_metadata_client = adm.secret_v1.SecretMetadataClient( + service='key-manager' + ) + + @classmethod + def setup_credentials(cls): + super().setup_credentials() + cls.os_primary = getattr(cls, f'os_{cls.credentials[0]}') + + @classmethod + def resource_setup(cls): + super().resource_setup() + for resource in RESOURCE_TYPES: + cls.created_objects[resource] = set() + + @classmethod + def resource_cleanup(cls): + try: + for container_uuid in list(cls.created_objects['container']): + cls.admin_container_client.delete_container(container_uuid) + cls.created_objects['container'].remove(container_uuid) + for order_uuid in list(cls.created_objects['order']): + cls.admin_order_client.delete_order(order_uuid) + cls.created_objects['order'].remove(order_uuid) + for quota_uuid in list(cls.created_objects['quota']): + cls.admin_quota_client.delete_project_quota(quota_uuid) + cls.created_objects['quota'].remove(quota_uuid) + for secret_uuid in list(cls.created_objects['secret']): + cls.admin_secret_client.delete_secret(secret_uuid) + cls.created_objects['secret'].remove(secret_uuid) + finally: + super(BarbicanV1RbacBase, cls).resource_cleanup() + + @classmethod + def add_cleanup(cls, resource, response): + if resource == 'container': + uuid = _get_uuid(response['container_ref']) + if resource == 'order': + uuid = _get_uuid(response.get('order_ref')) + order_metadata = cls.get_order(uuid) + secret_ref = order_metadata.get('secret_ref') + if secret_ref: + cls.created_objects['secret'].add(_get_uuid(secret_ref)) + uuid = _get_uuid(response['order_ref']) + if resource == 'quota': + uuid = _get_uuid(response['quota_ref']) + if resource == 'secret': + uuid = _get_uuid(response['secret_ref']) + cls.created_objects[resource].add(uuid) + + @classmethod + def delete_cleanup(cls, resource, uuid): + cls.created_objects[resource].remove(uuid) + + def do_request(self, method, client=None, expected_status=200, + cleanup=None, **args): + if client is None: + client = self.client + if isinstance(expected_status, type(Exception)): + self.assertRaises(expected_status, + getattr(client, method), + **args) + else: + response = getattr(client, method)(**args) + self.assertEqual(response.response.status, expected_status) + if cleanup is not None: + self.add_cleanup(cleanup, response) + return response diff --git a/barbican_tempest_plugin/tests/rbac/v1/test_secrets.py b/barbican_tempest_plugin/tests/rbac/v1/test_secrets.py new file mode 100644 index 0000000..3d6c6e3 --- /dev/null +++ b/barbican_tempest_plugin/tests/rbac/v1/test_secrets.py @@ -0,0 +1,359 @@ +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import abc +import base64 +from datetime import datetime +from datetime import timedelta +import os + +from cryptography.hazmat.backends import default_backend +from cryptography.hazmat.primitives import hashes +from cryptography.hazmat.primitives.kdf.pbkdf2 import PBKDF2HMAC + +from tempest import config +from tempest.lib import exceptions + +from barbican_tempest_plugin.tests.rbac.v1 import base as rbac_base + +CONF = config.CONF + + +def create_aes_key(): + password = b"password" + salt = os.urandom(16) + kdf = PBKDF2HMAC( + algorithm=hashes.SHA256(), length=32, salt=salt, + iterations=1000, backend=default_backend() + ) + return base64.b64encode(kdf.derive(password)) + + +class BarbicanV1RbacSecretsBase(rbac_base.BarbicanV1RbacBase, + metaclass=abc.ABCMeta): + + @classmethod + def setup_clients(cls): + super().setup_clients() + cls.client = cls.secret_client + + def create_empty_secret_admin(self, secret_name): + """add empty secret as admin user """ + return self.do_request( + 'create_secret', client=self.admin_secret_client, + expected_status=201, cleanup='secret', name=secret_name) + + def create_aes_secret_admin(self, secret_name): + key = create_aes_key() + expire_time = (datetime.utcnow() + timedelta(days=5)) + return key, self.do_request( + 'create_secret', client=self.admin_secret_client, + expected_status=201, cleanup="secret", + expiration=expire_time.isoformat(), algorithm="aes", + bit_length=256, mode="cbc", payload=key, + payload_content_type="application/octet-stream", + payload_content_encoding="base64", + name=secret_name + ) + + @abc.abstractmethod + def test_create_secret(self): + """Test add_secret policy. + + Testing: POST /v1/secrets + This test must check: + * whether the persona can create an empty secret + * whether the persona can create a secret with a symmetric key + """ + pass + + @abc.abstractmethod + def test_list_secrets(self): + """Test get_secrets policy. + + Testing: GET /v1/secrets + This test must check: + * whether the persona can list secrets within their project + """ + pass + + @abc.abstractmethod + def test_delete_secret(self): + """Test deleting a secret. + + Testing: DEL /v1/secrets/{secret_id} + This test must check: + * whether the persona can delete a secret in their project + """ + pass + + @abc.abstractmethod + def test_get_secret(self): + """Test get_secret policy. + + Testing: GET /v1/secrets/{secret_id} + This test must check: + * whether the persona can get a specific secret within their project + """ + pass + + @abc.abstractmethod + def test_get_secret_payload(self): + """Test get_secret payload policy. + + Testing: GET /v1/secrets/{secret_id}/payload + This test must check: + * whether the persona can get a secret payload + """ + pass + + @abc.abstractmethod + def test_put_secret_payload(self): + """Test put_secret policy. + + Testing: PUT /v1/secrets/{secret_id} + This test must check: + * whether the persona can add a paylod to an empty secret + """ + pass + + +class ProjectMemberTests(BarbicanV1RbacSecretsBase): + credentials = ['project_member', 'project_admin'] + + def test_create_secret(self): + """Test add_secret policy.""" + self.do_request('create_secret', expected_status=201, cleanup='secret') + + key = create_aes_key() + expire_time = (datetime.utcnow() + timedelta(days=5)) + self.do_request( + 'create_secret', expected_status=201, cleanup="secret", + expiration=expire_time.isoformat(), algorithm="aes", + bit_length=256, mode="cbc", payload=key, + payload_content_type="application/octet-stream", + payload_content_encoding="base64" + ) + + def test_list_secrets(self): + """Test get_secrets policy.""" + # create two secrets + self.create_empty_secret_admin('secret_1') + self.create_empty_secret_admin('secret_2') + + # list secrets with name secret_1 + resp = self.do_request('list_secrets', name='secret_1') + secrets = resp['secrets'] + self.assertEqual('secret_1', secrets[0]['name']) + + # list secrets with name secret_2 + resp = self.do_request('list_secrets', name='secret_2') + secrets = resp['secrets'] + self.assertEqual('secret_2', secrets[0]['name']) + + # list all secrets + resp = self.do_request('list_secrets') + secrets = resp['secrets'] + self.assertEqual(len(secrets), 2) + + def test_delete_secret(self): + """Test delete_secrets policy.""" + sec = self.create_empty_secret_admin('secret_1') + uuid = rbac_base._get_uuid(sec['secret_ref']) + self.do_request('delete_secret', secret_id=uuid) + self.delete_cleanup('secret', uuid) + + def test_get_secret(self): + """Test get_secret policy.""" + sec = self.create_empty_secret_admin('secret_1') + uuid = rbac_base._get_uuid(sec['secret_ref']) + resp = self.do_request('get_secret_metadata', secret_id=uuid) + self.assertEqual(uuid, rbac_base._get_uuid(resp['secret_ref'])) + + def test_get_secret_payload(self): + """Test get_secret payload policy.""" + key, sec = self.create_aes_secret_admin('secret_1') + uuid = rbac_base._get_uuid(sec['secret_ref']) + + # Retrieve the payload + payload = self.do_request('get_secret_payload', secret_id=uuid) + self.assertEqual(key, base64.b64encode(payload)) + + def test_put_secret_payload(self): + """Test put_secret policy.""" + sec = self.create_empty_secret_admin('secret_1') + uuid = rbac_base._get_uuid(sec['secret_ref']) + + key = create_aes_key() + + # Associate the payload with the created secret + self.do_request('put_secret_payload', secret_id=uuid, payload=key) + + # Retrieve the payload + payload = self.do_request('get_secret_payload', secret_id=uuid) + self.assertEqual(key, base64.b64encode(payload)) + + +class ProjectAdminTests(ProjectMemberTests): + credentials = ['project_admin', 'project_admin'] + + +class ProjectReaderTests(BarbicanV1RbacSecretsBase): + credentials = ['project_reader', 'project_admin'] + + def test_create_secret(self): + """Test add_secret policy.""" + self.do_request( + 'create_secret', expected_status=exceptions.Forbidden, + cleanup='secret') + + key = create_aes_key() + expire_time = (datetime.utcnow() + timedelta(days=5)) + self.do_request( + 'create_secret', expected_status=exceptions.Forbidden, + cleanup="secret", + expiration=expire_time.isoformat(), algorithm="aes", + bit_length=256, mode="cbc", payload=key, + payload_content_type="application/octet-stream", + payload_content_encoding="base64" + ) + + def test_list_secrets(self): + """Test get_secrets policy.""" + # create two secrets + self.create_empty_secret_admin('secret_1') + self.create_empty_secret_admin('secret_2') + + # list secrets with name secret_1 + self.do_request( + 'list_secrets', expected_status=exceptions.Forbidden, + name='secret_1' + ) + + # list secrets with name secret_2 + self.do_request( + 'list_secrets', expected_status=exceptions.Forbidden, + name='secret_2' + ) + + # list all secrets + self.do_request( + 'list_secrets', expected_status=exceptions.Forbidden + ) + + def test_delete_secret(self): + """Test delete_secrets policy.""" + sec = self.create_empty_secret_admin('secret_1') + uuid = rbac_base._get_uuid(sec['secret_ref']) + self.do_request( + 'delete_secret', expected_status=exceptions.Forbidden, + secret_id=uuid + ) + + def test_get_secret(self): + """Test get_secret policy.""" + sec = self.create_empty_secret_admin('secret_1') + uuid = rbac_base._get_uuid(sec['secret_ref']) + self.do_request( + 'get_secret_metadata', expected_status=exceptions.Forbidden, + secret_id=uuid + ) + + def test_get_secret_payload(self): + """Test get_secret payload policy.""" + key, sec = self.create_aes_secret_admin('secret_1') + uuid = rbac_base._get_uuid(sec['secret_ref']) + + # Retrieve the payload + self.do_request( + 'get_secret_payload', expected_status=exceptions.Forbidden, + secret_id=uuid + ) + + def test_put_secret_payload(self): + """Test put_secret policy.""" + sec = self.create_empty_secret_admin('secret_1') + uuid = rbac_base._get_uuid(sec['secret_ref']) + + key = create_aes_key() + + # Associate the payload with the created secret + self.do_request( + 'put_secret_payload', expected_status=exceptions.Forbidden, + secret_id=uuid, payload=key + ) + + +class SystemAdminTests(BarbicanV1RbacSecretsBase): + credentials = ['system_admin', 'project_admin'] + + def test_create_secret(self): + pass + + def test_list_secrets(self): + pass + + def test_delete_secret(self): + pass + + def test_get_secret(self): + pass + + def test_get_secret_payload(self): + pass + + def test_put_secret_payload(self): + pass + + +class SystemMemberTests(BarbicanV1RbacSecretsBase): + credentials = ['system_member', 'project_admin'] + + def test_create_secret(self): + pass + + def test_list_secrets(self): + pass + + def test_delete_secret(self): + pass + + def test_get_secret(self): + pass + + def test_get_secret_payload(self): + pass + + def test_put_secret_payload(self): + pass + + +class SystemReaderTests(BarbicanV1RbacSecretsBase): + credentials = ['system_reader', 'project_admin'] + + def test_create_secret(self): + pass + + def test_list_secrets(self): + pass + + def test_delete_secret(self): + pass + + def test_get_secret(self): + pass + + def test_get_secret_payload(self): + pass + + def test_put_secret_payload(self): + pass