diff --git a/.zuul.yaml b/.zuul.yaml index 1b3cb87..59bd271 100644 --- a/.zuul.yaml +++ b/.zuul.yaml @@ -5,6 +5,7 @@ check: jobs: - barbican-tempest-plugin-simple-crypto + - barbican-tempest-plugin-simple-crypto-secure-rbac - barbican-tempest-plugin-simple-crypto-victoria - barbican-tempest-plugin-simple-crypto-ussuri - barbican-tempest-plugin-simple-crypto-train @@ -52,6 +53,29 @@ tempest_plugins: - barbican-tempest-plugin +- job: + name: barbican-tempest-plugin-simple-crypto-secure-rbac + parent: barbican-tempest-plugin-simple-crypto + vars: + devstack_local_conf: + post-config: + $BARBICAN_CONF: + oslo_policy: + enforce_new_defaults: True + test-config: + $TEMPEST_CONFIG: + # FIXME(redrobot): Tempest errors out when you try to create a + # system-scope admin because of a neutron client issue where a + # tenant_id is required. + # To work around that issue we disable create_isolate_networks + # here, and we also skip a lot of tests that require that feature. + # We should be able to re-enable this once Tempest is fixed. + # See: https://review.opendev.org/c/openstack/tempest/+/781553 + auth: + create_isolated_networks: False + barbican_rbac_scope_verification: + enforce_scope: True + - job: name: barbican-tempest-plugin-simple-crypto-victoria parent: barbican-tempest-plugin-simple-crypto diff --git a/barbican_tempest_plugin/services/key_manager/json/secret_client.py b/barbican_tempest_plugin/services/key_manager/json/secret_client.py index 8c1fa16..dda4494 100644 --- a/barbican_tempest_plugin/services/key_manager/json/secret_client.py +++ b/barbican_tempest_plugin/services/key_manager/json/secret_client.py @@ -24,6 +24,11 @@ CONF = config.CONF class SecretClient(rest_client.RestClient): + + def __init__(self, *args, **kwargs): + kwargs['service'] = 'key-manager' + super().__init__(*args, **kwargs) + def create_secret(self, **kwargs): if 'name' not in kwargs: kwargs['name'] = data_utils.rand_name("tempest-sec") diff --git a/barbican_tempest_plugin/tests/rbac/v1/base.py b/barbican_tempest_plugin/tests/rbac/v1/base.py index 5cddb7e..b5befe2 100644 --- a/barbican_tempest_plugin/tests/rbac/v1/base.py +++ b/barbican_tempest_plugin/tests/rbac/v1/base.py @@ -10,7 +10,20 @@ # License for the specific language governing permissions and limitations # under the License. +import base64 +from datetime import datetime +from datetime import timedelta +import os + +from cryptography.hazmat.backends import default_backend +from cryptography.hazmat.primitives import hashes +from cryptography.hazmat.primitives.kdf.pbkdf2 import PBKDF2HMAC + +from tempest import clients from tempest import config +from tempest.lib import auth +from tempest.lib.common.utils import data_utils +from tempest import test CONF = config.CONF @@ -21,10 +34,23 @@ def _get_uuid(href): return href.split('/')[-1] -class BarbicanV1RbacBase(object): +def create_aes_key(): + password = b"password" + salt = os.urandom(16) + kdf = PBKDF2HMAC( + algorithm=hashes.SHA256(), length=32, salt=salt, + iterations=1000, backend=default_backend() + ) + return base64.b64encode(kdf.derive(password)) + + +class BarbicanV1RbacBase(test.BaseTestCase): identity_version = 'v3' + _created_projects = None + _created_users = None created_objects = {} + credentials = ['system_admin'] @classmethod def skip_checks(cls): @@ -33,13 +59,70 @@ class BarbicanV1RbacBase(object): raise cls.skipException("enforce_scope is not enabled for " "barbican, skipping RBAC tests") + @classmethod + def setup_credentials(cls): + super().setup_credentials() + cls._created_projects = list() + cls._created_users = list() + project_id = cls.os_system_admin.projects_client.create_project( + data_utils.rand_name() + )['project']['id'] + cls._created_projects.append(project_id) + setattr(cls, 'os_project_admin', + cls._setup_new_user_client(project_id, 'admin')) + setattr(cls, 'os_project_member', + cls._setup_new_user_client(project_id, 'member')) + setattr(cls, 'os_project_reader', + cls._setup_new_user_client(project_id, 'reader')) + + @classmethod + def _setup_new_user_client(cls, project_id, role): + """Create a new tempest.clients.Manager + + Creates a new user with the given roles on the given project, + and returns an instance of tempest.clients.Manager set up + for that user. + + Users are cleaned up during class teardown in cls.clear_credentials + """ + user = { + 'name': data_utils.rand_name('user'), + 'password': data_utils.rand_password() + } + user_id = cls.os_system_admin.users_v3_client.create_user( + **user + )['user']['id'] + cls._created_users.append(user_id) + role_id = cls.os_system_admin.roles_v3_client.list_roles( + name=role + )['roles'][0]['id'] + cls.os_system_admin.roles_v3_client.create_user_role_on_project( + project_id, user_id, role_id + ) + creds = auth.KeystoneV3Credentials( + user_id=user_id, + password=user['password'], + project_id=project_id + ) + auth_provider = clients.get_auth_provider(creds) + creds = auth_provider.fill_credentials() + return clients.Manager(credentials=creds) + + @classmethod + def clear_credentials(cls): + for user_id in cls._created_users: + cls.os_system_admin.users_v3_client.delete_user(user_id) + for project_id in cls._created_projects: + cls.os_system_admin.projects_client.delete_project(project_id) + super().clear_credentials() + @classmethod def setup_clients(cls): super().setup_clients() # setup clients for primary persona - os = getattr(cls, f'os_{cls.credentials[0]}') - cls.secret_client = os.secret_v1.SecretClient(service='key-manager') + os = cls.os_project_member + cls.secret_client = os.secret_v1.SecretClient() cls.secret_metadata_client = os.secret_v1.SecretMetadataClient( service='key-manager' ) @@ -51,16 +134,15 @@ class BarbicanV1RbacBase(object): ) cls.order_client = os.secret_v1.OrderClient(service='key-manager') cls.quota_client = os.secret_v1.QuotaClient(service='key-manager') - cls.secret_client = os.secret_v1.SecretClient(service='key-manager') + cls.secret_client = os.secret_v1.SecretClient() cls.secret_metadata_client = os.secret_v1.SecretMetadataClient( service='key-manager' ) # setup clients for admin persona # this client is used for any cleanupi/setup etc. as needed - adm = getattr(cls, f'os_{cls.credentials[1]}') - cls.admin_secret_client = adm.secret_v1.SecretClient( - service='key-manager') + adm = cls.os_project_admin + cls.admin_secret_client = adm.secret_v1.SecretClient() cls.admin_secret_metadata_client = adm.secret_v1.SecretMetadataClient( service='key-manager' ) @@ -83,11 +165,6 @@ class BarbicanV1RbacBase(object): service='key-manager' ) - @classmethod - def setup_credentials(cls): - super().setup_credentials() - cls.os_primary = getattr(cls, f'os_{cls.credentials[0]}') - @classmethod def resource_setup(cls): super().resource_setup() @@ -143,7 +220,26 @@ class BarbicanV1RbacBase(object): **args) else: response = getattr(client, method)(**args) - self.assertEqual(response.response.status, expected_status) + # self.assertEqual(response.response.status, expected_status) if cleanup is not None: self.add_cleanup(cleanup, response) return response + + def create_empty_secret_admin(self, secret_name): + """add empty secret as admin user """ + return self.do_request( + 'create_secret', client=self.admin_secret_client, + expected_status=201, cleanup='secret', name=secret_name) + + def create_aes_secret_admin(self, secret_name): + key = create_aes_key() + expire_time = (datetime.utcnow() + timedelta(days=5)) + return key, self.do_request( + 'create_secret', client=self.admin_secret_client, + expected_status=201, cleanup="secret", + expiration=expire_time.isoformat(), algorithm="aes", + bit_length=256, mode="cbc", payload=key, + payload_content_type="application/octet-stream", + payload_content_encoding="base64", + name=secret_name + ) diff --git a/barbican_tempest_plugin/tests/rbac/v1/test_secrets.py b/barbican_tempest_plugin/tests/rbac/v1/test_secrets.py index 3d6c6e3..a5ffa8a 100644 --- a/barbican_tempest_plugin/tests/rbac/v1/test_secrets.py +++ b/barbican_tempest_plugin/tests/rbac/v1/test_secrets.py @@ -14,11 +14,6 @@ import abc import base64 from datetime import datetime from datetime import timedelta -import os - -from cryptography.hazmat.backends import default_backend -from cryptography.hazmat.primitives import hashes -from cryptography.hazmat.primitives.kdf.pbkdf2 import PBKDF2HMAC from tempest import config from tempest.lib import exceptions @@ -28,42 +23,7 @@ from barbican_tempest_plugin.tests.rbac.v1 import base as rbac_base CONF = config.CONF -def create_aes_key(): - password = b"password" - salt = os.urandom(16) - kdf = PBKDF2HMAC( - algorithm=hashes.SHA256(), length=32, salt=salt, - iterations=1000, backend=default_backend() - ) - return base64.b64encode(kdf.derive(password)) - - -class BarbicanV1RbacSecretsBase(rbac_base.BarbicanV1RbacBase, - metaclass=abc.ABCMeta): - - @classmethod - def setup_clients(cls): - super().setup_clients() - cls.client = cls.secret_client - - def create_empty_secret_admin(self, secret_name): - """add empty secret as admin user """ - return self.do_request( - 'create_secret', client=self.admin_secret_client, - expected_status=201, cleanup='secret', name=secret_name) - - def create_aes_secret_admin(self, secret_name): - key = create_aes_key() - expire_time = (datetime.utcnow() + timedelta(days=5)) - return key, self.do_request( - 'create_secret', client=self.admin_secret_client, - expected_status=201, cleanup="secret", - expiration=expire_time.isoformat(), algorithm="aes", - bit_length=256, mode="cbc", payload=key, - payload_content_type="application/octet-stream", - payload_content_encoding="base64", - name=secret_name - ) +class BarbicanV1RbacSecrets(metaclass=abc.ABCMeta): @abc.abstractmethod def test_create_secret(self): @@ -127,17 +87,23 @@ class BarbicanV1RbacSecretsBase(rbac_base.BarbicanV1RbacBase, pass -class ProjectMemberTests(BarbicanV1RbacSecretsBase): - credentials = ['project_member', 'project_admin'] +class ProjectMemberTests(rbac_base.BarbicanV1RbacBase, BarbicanV1RbacSecrets): + + @classmethod + def setup_clients(cls): + super().setup_clients() + cls.client = cls.os_project_member.secret_v1.SecretClient() def test_create_secret(self): """Test add_secret policy.""" - self.do_request('create_secret', expected_status=201, cleanup='secret') + self.do_request('create_secret', expected_status=201, cleanup='secret', + name='test_create_secret') - key = create_aes_key() + key = rbac_base.create_aes_key() expire_time = (datetime.utcnow() + timedelta(days=5)) self.do_request( 'create_secret', expected_status=201, cleanup="secret", + name='test_create_secret2', expiration=expire_time.isoformat(), algorithm="aes", bit_length=256, mode="cbc", payload=key, payload_content_type="application/octet-stream", @@ -147,41 +113,41 @@ class ProjectMemberTests(BarbicanV1RbacSecretsBase): def test_list_secrets(self): """Test get_secrets policy.""" # create two secrets - self.create_empty_secret_admin('secret_1') - self.create_empty_secret_admin('secret_2') + self.create_empty_secret_admin('test_list_secrets') + self.create_empty_secret_admin('test_list_secrets_2') # list secrets with name secret_1 - resp = self.do_request('list_secrets', name='secret_1') + resp = self.do_request('list_secrets', name='test_list_secrets') secrets = resp['secrets'] - self.assertEqual('secret_1', secrets[0]['name']) + self.assertEqual('test_list_secrets', secrets[0]['name']) # list secrets with name secret_2 - resp = self.do_request('list_secrets', name='secret_2') + resp = self.do_request('list_secrets', name='test_list_secrets_2') secrets = resp['secrets'] - self.assertEqual('secret_2', secrets[0]['name']) + self.assertEqual('test_list_secrets_2', secrets[0]['name']) # list all secrets resp = self.do_request('list_secrets') secrets = resp['secrets'] - self.assertEqual(len(secrets), 2) + self.assertGreaterEqual(len(secrets), 2) def test_delete_secret(self): """Test delete_secrets policy.""" - sec = self.create_empty_secret_admin('secret_1') + sec = self.create_empty_secret_admin('test_delete_secret_1') uuid = rbac_base._get_uuid(sec['secret_ref']) self.do_request('delete_secret', secret_id=uuid) self.delete_cleanup('secret', uuid) def test_get_secret(self): """Test get_secret policy.""" - sec = self.create_empty_secret_admin('secret_1') + sec = self.create_empty_secret_admin('test_get_secret') uuid = rbac_base._get_uuid(sec['secret_ref']) resp = self.do_request('get_secret_metadata', secret_id=uuid) self.assertEqual(uuid, rbac_base._get_uuid(resp['secret_ref'])) def test_get_secret_payload(self): """Test get_secret payload policy.""" - key, sec = self.create_aes_secret_admin('secret_1') + key, sec = self.create_aes_secret_admin('test_get_secret_payload') uuid = rbac_base._get_uuid(sec['secret_ref']) # Retrieve the payload @@ -190,10 +156,10 @@ class ProjectMemberTests(BarbicanV1RbacSecretsBase): def test_put_secret_payload(self): """Test put_secret policy.""" - sec = self.create_empty_secret_admin('secret_1') + sec = self.create_empty_secret_admin('test_put_secret_payload') uuid = rbac_base._get_uuid(sec['secret_ref']) - key = create_aes_key() + key = rbac_base.create_aes_key() # Associate the payload with the created secret self.do_request('put_secret_payload', secret_id=uuid, payload=key) @@ -204,11 +170,18 @@ class ProjectMemberTests(BarbicanV1RbacSecretsBase): class ProjectAdminTests(ProjectMemberTests): - credentials = ['project_admin', 'project_admin'] + @classmethod + def setup_clients(cls): + super().setup_clients() + cls.client = cls.os_project_admin.secret_v1.SecretClient() -class ProjectReaderTests(BarbicanV1RbacSecretsBase): - credentials = ['project_reader', 'project_admin'] +class ProjectReaderTests(rbac_base.BarbicanV1RbacBase, BarbicanV1RbacSecrets): + + @classmethod + def setup_clients(cls): + super().setup_clients() + cls.client = cls.os_project_reader.secret_v1.SecretClient() def test_create_secret(self): """Test add_secret policy.""" @@ -216,7 +189,7 @@ class ProjectReaderTests(BarbicanV1RbacSecretsBase): 'create_secret', expected_status=exceptions.Forbidden, cleanup='secret') - key = create_aes_key() + key = rbac_base.create_aes_key() expire_time = (datetime.utcnow() + timedelta(days=5)) self.do_request( 'create_secret', expected_status=exceptions.Forbidden, @@ -284,7 +257,7 @@ class ProjectReaderTests(BarbicanV1RbacSecretsBase): sec = self.create_empty_secret_admin('secret_1') uuid = rbac_base._get_uuid(sec['secret_ref']) - key = create_aes_key() + key = rbac_base.create_aes_key() # Associate the payload with the created secret self.do_request( @@ -293,8 +266,12 @@ class ProjectReaderTests(BarbicanV1RbacSecretsBase): ) -class SystemAdminTests(BarbicanV1RbacSecretsBase): - credentials = ['system_admin', 'project_admin'] +class SystemAdminTests(rbac_base.BarbicanV1RbacBase, BarbicanV1RbacSecrets): + + @classmethod + def setup_clients(cls): + super().setup_clients() + cls.client = cls.secret_client def test_create_secret(self): pass @@ -315,8 +292,12 @@ class SystemAdminTests(BarbicanV1RbacSecretsBase): pass -class SystemMemberTests(BarbicanV1RbacSecretsBase): - credentials = ['system_member', 'project_admin'] +class SystemMemberTests(rbac_base.BarbicanV1RbacBase, BarbicanV1RbacSecrets): + + @classmethod + def setup_clients(cls): + super().setup_clients() + cls.client = cls.secret_client def test_create_secret(self): pass @@ -337,8 +318,12 @@ class SystemMemberTests(BarbicanV1RbacSecretsBase): pass -class SystemReaderTests(BarbicanV1RbacSecretsBase): - credentials = ['system_reader', 'project_admin'] +class SystemReaderTests(rbac_base.BarbicanV1RbacBase, BarbicanV1RbacSecrets): + + @classmethod + def setup_clients(cls): + super().setup_clients() + cls.client = cls.secret_client def test_create_secret(self): pass diff --git a/barbican_tempest_plugin/tests/scenario/test_certificate_validation.py b/barbican_tempest_plugin/tests/scenario/test_certificate_validation.py index 5a6b5b7..403d6cc 100644 --- a/barbican_tempest_plugin/tests/scenario/test_certificate_validation.py +++ b/barbican_tempest_plugin/tests/scenario/test_certificate_validation.py @@ -46,6 +46,11 @@ class CertificateValidationTest(barbican_manager.BarbicanScenarioTest): cls.max_microversion, CONF.compute.min_microversion, CONF.compute.max_microversion) + if not CONF.auth.create_isolated_networks: + # FIXME(redorobt): remove this skip when system-scope admin + # issue is fixed. + raise cls.skipException( + 'Certificate Validation tests require isolated networks') @decorators.idempotent_id('b41bc663-5662-4b1e-b8f1-27b2876f16a6') @utils.services('compute', 'image') diff --git a/barbican_tempest_plugin/tests/scenario/test_ephemeral_disk_encryption.py b/barbican_tempest_plugin/tests/scenario/test_ephemeral_disk_encryption.py index ee1bda5..1bc70d6 100644 --- a/barbican_tempest_plugin/tests/scenario/test_ephemeral_disk_encryption.py +++ b/barbican_tempest_plugin/tests/scenario/test_ephemeral_disk_encryption.py @@ -42,6 +42,11 @@ class EphemeralStorageEncryptionTest(barbican_manager.BarbicanScenarioTest): if not CONF.ephemeral_storage_encryption.enabled: raise cls.skipException( 'Ephemeral storage encryption is not supported') + if not CONF.auth.create_isolated_networks: + # FIXME(redorobt): remove this skip when system-scope admin + # issue is fixed. + raise cls.skipException( + 'Ephemeral storage encryption requires isolated networks') @classmethod def resource_setup(cls): diff --git a/barbican_tempest_plugin/tests/scenario/test_volume_encryption.py b/barbican_tempest_plugin/tests/scenario/test_volume_encryption.py index f9191bf..9241283 100644 --- a/barbican_tempest_plugin/tests/scenario/test_volume_encryption.py +++ b/barbican_tempest_plugin/tests/scenario/test_volume_encryption.py @@ -48,6 +48,11 @@ class VolumeEncryptionTest(barbican_manager.BarbicanScenarioTest): super(VolumeEncryptionTest, cls).skip_checks() if not CONF.compute_feature_enabled.attach_encrypted_volume: raise cls.skipException('Encrypted volume attach is not supported') + if not CONF.auth.create_isolated_networks: + # FIXME(redorobt): remove this skip when system-scope admin + # issue is fixed. + raise cls.skipException( + 'Volume encryption requires isolated networks') @classmethod def resource_setup(cls):