diff --git a/enhanced-policies/test-requirements.txt b/enhanced-policies/test-requirements.txt new file mode 100644 index 00000000..e71bb097 --- /dev/null +++ b/enhanced-policies/test-requirements.txt @@ -0,0 +1,15 @@ +pytest==6.2.2 +openstacksdk==0.55.0 +os-client-config==2.1.0 + +#Nova +python-novaclient==17.4.0 + +#Cinder +python-cinderclient==7.3.0 + +#Glance +python-glanceclient==3.3.0 + +# Neutron +netaddr \ No newline at end of file diff --git a/enhanced-policies/tests/__init__.py b/enhanced-policies/tests/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/enhanced-policies/tests/conftest.py b/enhanced-policies/tests/conftest.py new file mode 100644 index 00000000..f6d1de4e --- /dev/null +++ b/enhanced-policies/tests/conftest.py @@ -0,0 +1,147 @@ +# +# Copyright (c) 2021 Wind River Systems, Inc. +# +# SPDX-License-Identifier: Apache-2.0 +# +# All Rights Reserved. +# + +from pytest import fixture + +from tests.fv_rbac import debug1 +from tests.fv_rbac import OpenStackNetworkingSetup +from tests.fv_rbac import OpenStackTestingSetup + + +def pytest_addoption(parser): + parser.addoption( + "--env", + action="store", + default="stx", + help="Environment to run tests against (stx or wro). Default: stx" + ) + + +@fixture(scope="session") +def env(request): + return request.config.getoption("--env") + + +@fixture(scope='session') +def rbac_setup(request): + + if debug1: + print("\nSession Initialization") + + cfg = OpenStackTestingSetup() + + # Create projects + cfg._create_project("project1", description="project1 for rbac test1") + cfg._create_project("project2", description="project2 for rbac test1") + + # NOTE(tbrito): assume roles are already created + # Create roles + # for role in ["project_readonly", "project_admin"]: + # cfg._create_role(role) + + # Create users + for user in cfg.users: + cfg._create_user(user) + + # Assign Roles to Users + cfg._grant_role("project_admin", "user11", "project1") + cfg._grant_role("member", "user12", "project1") + cfg._grant_role("project_readonly", "user13", "project1") + cfg._grant_role("admin", "user02", "project2") + cfg._grant_role("project_admin", "user21", "project2") + cfg._grant_role("member", "user22", "project2") + cfg._grant_role("project_readonly", "user23", "project2") + + image = cfg._create_admin_image() + + def teardown(): + + if debug1: + print("\nSession Teardown") + + cfg._delete_admin_image(image) + + cfg._revoke_role("project_admin", "user11", "project1") + cfg._revoke_role("member", "user12", "project1") + cfg._revoke_role("project_readonly", "user13", "project1") + cfg._revoke_role("admin", "user02", "project2") + cfg._revoke_role("project_admin", "user21", "project2") + cfg._revoke_role("member", "user22", "project2") + cfg._revoke_role("project_readonly", "user23", "project2") + + for user in cfg.users: + cfg._delete_user(user) + + # NOTE(tbrito): Roles should NOT be removed on a live deployment + # for role in ["project_readonly", "project_admin"]: + # cfg._delete_role(role) + + for project in ["project1", "project2"]: + cfg._delete_project(project) + + request.addfinalizer(teardown) + + return cfg + + +@fixture(scope='session') +def network_admin_setup(request, rbac_setup, env): + + cfg = OpenStackNetworkingSetup(env) + + # Create segment ranges based on projects + cfg._create_network_segment_range( + "group0-ext-r0", + shared=True, + network_type="vlan", + physical_network="group0-data0", + minimum=10, maximum=10 + ) + + cfg._create_network_segment_range( + "group0-data0-r0", + project_name="project1", + shared=False, + network_type="vlan", + physical_network="group0-data0", + minimum=400, maximum=499 + ) + + cfg._create_network_segment_range( + "group0-data0b-r0", + shared=True, + network_type="vlan", + physical_network="group0-data0", + minimum=500, maximum=599 + ) + + cfg._create_network_segment_range( + "group0-data1-r0", + project="project2", + shared=False, + network_type="vlan", + physical_network="group0-data1", + minimum=600, maximum=699 + ) + + if env == "wro": + cfg._create_qos("admin-qos", weight=16, + description="External Network Policy") + + def network_admin_teardown(): + cfg._delete_network_segment_range("group0-data1-r0") + cfg._delete_network_segment_range("group0-data0b-r0") + cfg._delete_network_segment_range("group0-data0-r0") + cfg._delete_network_segment_range("group0-ext-r0") + + if env == "wro": + cfg._delete_qos("admin-qos") + + request.addfinalizer(network_admin_teardown) + + return cfg diff --git a/enhanced-policies/tests/fv_rbac.py b/enhanced-policies/tests/fv_rbac.py new file mode 100644 index 00000000..56911b8b --- /dev/null +++ b/enhanced-policies/tests/fv_rbac.py @@ -0,0 +1,636 @@ +# +# Copyright (c) 2021 Wind River Systems, Inc. +# +# SPDX-License-Identifier: Apache-2.0 +# +# All Rights Reserved. +# + +import os + +from cinderclient import client as CinderClient +from glanceclient import Client as GlanceClient +from keystoneauth1 import loading +from keystoneauth1 import session +from novaclient import client as NovaClient +import openstack +import os_client_config +import pytest + +TEST_CLOUD = os.getenv("OS_CLOUD") + +debug = False # For general issues +debug1 = True # For teardown issues +debug2 = False # For temporary issues + + +class OpenStackTestingSetup: + + def __init__(self): + self.os_sdk_admin_conn = openstack.connect(cloud=TEST_CLOUD) + + self.user11 = { + "name": "user11", + "password": "User11@Project1", + "project": "project1" + } + self.user12 = { + "name": "user12", + "password": "User12@Project1", + "project": "project1" + } + self.user13 = { + "name": "user13", + "password": "User13@Project1", + "project": "project1" + } + self.user02 = { + "name": "user02", + "password": "user02@Project2", + "project": "project2" + } + self.user21 = { + "name": "user21", + "password": "User21@Project2", + "project": "project2" + } + self.user22 = { + "name": "user22", + "password": "User22@Project2", + "project": "project2" + } + self.user23 = { + "name": "user23", + "password": "User23@Project2", + "project": "project2" + } + + self.users = ( + self.user11, + self.user12, + self.user13, + self.user02, + self.user21, + self.user22, + self.user23 + ) + + def _get_project(self, name): + return self.os_sdk_admin_conn.get_project(name) + + def _create_project(self, name, description): + project = self._get_project(name) + if project is None: + return self.os_sdk_admin_conn.create_project( + name, + domain_id="default", + description=description + ) + return project + + def _delete_project(self, name): + self.os_sdk_admin_conn.delete_project(name) + + def _create_role(self, name): + role = self.os_sdk_admin_conn.get_role(name) + if role is None: + return self.os_sdk_admin_conn.create_role(name) + return role + + def _delete_role(self, name): + self.os_sdk_admin_conn.delete_role(name) + + def _grant_role(self, name, user_name, project_name): + self.os_sdk_admin_conn.grant_role( + name, + user=user_name, + project=project_name + ) + + def _revoke_role(self, name, user_name, project_name): + self.os_sdk_admin_conn.revoke_role( + name, + user=user_name, + project=project_name + ) + + def _create_user(self, user): + user_obj = self.os_sdk_admin_conn.identity.find_user(user.get("name")) + if user_obj is None: + return self.os_sdk_admin_conn.identity.create_user( + name=user.get("name"), + password=user.get("password"), + default_project=user.get("project")) + return user_obj + + def _delete_user(self, user): + self.os_sdk_admin_conn.delete_user(user.get("name")) + + def _create_admin_image(self): + # wget http://download.cirros-cloud.net/0.3.4/cirros-0.3.4-x86_64-disk.img + image = self.os_sdk_admin_conn.image.find_image("cirros") + if not image: + return self.os_sdk_admin_conn.create_image( + "cirros", + filename="cirros-0.3.4-x86_64-disk.img", + disk_format="qcow2", + container_format="bare", + wait=True, + visibility="public" + ) + return image + + def _delete_admin_image(self, image): + if image: + self.os_sdk_admin_conn.delete_image(image.id) + + +class OpenStackNetworkingSetup(OpenStackTestingSetup): + + def __init__(self, env): + super(OpenStackNetworkingSetup, self).__init__() + self.env = env + + def _create_network_segment_range(self, name, project_name=None, **kwargs): + sr = self.os_sdk_admin_conn.network.find_network_segment_range(name) + if sr is None: + project_id = None + if project_name: + project_id = self.os_sdk_admin_conn.get_project( + project_name).id + + if project_id is None: + return self.os_sdk_admin_conn.network. \ + create_network_segment_range(name=name, **kwargs) + else: + return self.os_sdk_admin_conn.network. \ + create_network_segment_range( + name=name, + project_id=project_id, + **kwargs + ) + return sr + + def _delete_network_segment_range(self, name_or_id): + return self.os_sdk_admin_conn.network.delete_network(name_or_id) + + +class OpenStackRouterInterface: + def __init__(self, router_name_or_id, subnet_name_or_id): + self.router_name_or_id = router_name_or_id + self.subnet_name_or_id = subnet_name_or_id + + +class OpenStackBasicTesting(): + os_sdk_conn = None + os_sdk_admin_conn = None + + # ------------------------------------------------------------------------- + # Tear down + # ------------------------------------------------------------------------- + + instances_clearing = list() + images_clearing = list() + snapshots_clearing = list() + volumes_clearing = list() + volume_bkps_clearing = list() + security_groups_clearing = list() + floating_ips_clearing = list() + interfaces_clearing = list() + routers_clearing = list() + trunks_clearing = list() + ports_clearing = list() + subnets_clearing = list() + networks_clearing = list() + subnet_pools_clearing = list() + address_scopes_clearing = list() + + @pytest.fixture(scope='function') + def tc_teardown(self, request): + + def teardown(): + + if debug1: + print("\nTC Teardown") + + self.os_sdk_conn = self.os_sdk_admin_conn + for instance_id in self.instances_clearing: + self._delete_server(instance_id, autoclear=False) + for image_id in self.images_clearing: + self._delete_image(image_id, autoclear=False) + for snap_id in self.snapshots_clearing: + self._delete_snapshot(snap_id, autoclear=False) + for vol_id in self.volumes_clearing: + self._delete_volume(vol_id, autoclear=False) + for bkp_id in self.volume_bkps_clearing: + self._delete_volume_backup(bkp_id, autoclear=False) + for sg_id in self.security_groups_clearing: + self._delete_security_group(sg_id, autoclear=False) + for fip_id in self.floating_ips_clearing: + self._delete_floatingip(fip_id, autoclear=False) + for ri in self.interfaces_clearing: + self._delete_interface_from_router(ri, autoclear=False) + for router_id in self.routers_clearing: + self._delete_router(router_id, autoclear=False) + for trunk_id in self.trunks_clearing: + self._delete_trunk(trunk_id, autoclear=False) + for port_id in self.ports_clearing: + self._delete_port(port_id, autoclear=False) + for subnet_id in self.subnets_clearing: + self._delete_subnet(subnet_id, autoclear=False) + for network_id in self.networks_clearing: + self._delete_network(network_id, autoclear=False) + for subnet_pool_id in self.subnet_pools_clearing: + self._delete_subnetpool(subnet_pool_id, autoclear=False) + for address_scope_id in self.address_scopes_clearing: + self._delete_addrscope(address_scope_id, autoclear=False) + + self.instances_clearing.clear() + self.images_clearing.clear() + self.snapshots_clearing.clear() + self.volumes_clearing.clear() + self.volume_bkps_clearing.clear() + self.security_groups_clearing.clear() + self.floating_ips_clearing.clear() + self.interfaces_clearing.clear() + self.routers_clearing.clear() + self.trunks_clearing.clear() + self.ports_clearing.clear() + self.subnets_clearing.clear() + self.networks_clearing.clear() + self.subnet_pools_clearing.clear() + self.address_scopes_clearing.clear() + + request.addfinalizer(teardown) + + return + + @pytest.fixture(scope='class') + def create_external_network(self, request): + self.set_connections_for_user(self.user02) + + args = {'router:external': True} + network = self._create_network( + "extnet21", + shared=True, + autoclear=False, + **args) + assert network is not None + assert "extnet21" in [n.name for n in self._list_networks()] + + subnet = self._create_subnet( + "extsubnet21", + "extnet21", + cidr="192.168.195.0/24", + gateway_ip="192.168.195.1", autoclear=False + ) + assert subnet is not None + subnet = self._get_subnet("extsubnet21") + + yield network, subnet + + self.set_connections_for_user(self.user02) + self._delete_subnet("extsubnet21", autoclear=False) + self._delete_network("extnet21", autoclear=False) + + @pytest.fixture(scope='class') + def create_router_vr11(self, request, create_external_network): + extnet, extsubnet = create_external_network + + self.set_connections_for_user(self.user11) + vr11 = self._create_router("vr11", extnet.name, autoclear=False) + vr11 = self._get_router("vr11") + assert vr11 is not None + + yield vr11 + + self.set_connections_for_user(self.user11) + self._delete_router("vr11", autoclear=False) + + @pytest.fixture(scope='class') + def create_router_vr21(self, request, create_external_network): + extnet, extsubnet = create_external_network + + self.set_connections_for_user(self.user02) + vr21 = self._create_router("vr21", extnet.name, autoclear=False) + vr21 = self._get_router("vr21") + assert vr21 is not None + + yield vr21 + + self.set_connections_for_user(self.user02) + self._delete_router("vr21", autoclear=False) + + def _get_conn_for(self, user): + conn = self.os_sdk_admin_conn + user_obj = conn.identity.find_user(user.get("name")) + project = conn.get_project(user.get("project")) + + return openstack.connection.Connection( + auth=dict( + auth_url=conn.auth.get("auth_url"), + username=user.get("name"), + password=user.get("password"), + project_id=project.id, + user_domain_id=user_obj.domain_id + ) + ) + + # ------------------------------------------------------------------------- + # API Session Connection + # ------------------------------------------------------------------------- + # OpenStack Python API + def _get_session_for_user(self, user): + creds = os_client_config.OpenStackConfig() \ + .get_one_cloud(cloud=TEST_CLOUD).get_auth_args() + sloader = loading.get_plugin_loader("password") + auth = sloader.load_from_options( + auth_url=self.os_sdk_admin_conn.auth.get("auth_url"), + username=user.get("name"), + password=user.get("password"), + project_name=user.get("project"), + project_domain_name=creds['project_domain_name'], + user_domain_name=creds['user_domain_name'], + ) + return session.Session(auth=auth) + + # CinderClient Python API + def _get_cclient_for(self, user): + self.sess = self._get_session_for_user(user) + return CinderClient.Client('3', session=self.sess, http_log_debug=True) + + # GlanceClient Python API + def _get_gclient_for(self, user): + self.sess = self._get_session_for_user(user) + return GlanceClient('2', session=self.sess) + + # NovaClient Python API + def _get_nclient_for(self, user): + self.sess = self._get_session_for_user(user) + return NovaClient.Client('2', session=self.sess) + + def _get_os_conn_for(self, user): + self.sess = self._get_session_for_user(user) + return openstack.connection.Connection(session=self.sess) + + def set_connections_for_user(self, user): + self.os_sdk_conn = self._get_os_conn_for(user) + self.nc = self._get_nclient_for(user) + self.cc = self._get_cclient_for(user) + self.gc = self._get_gclient_for(user) + + # ------------------------------------------------------------------------- + # Volume methods - Cinder + # ------------------------------------------------------------------------- + + def _create_volume(self, volume_name, autoclear=True): + vol = self.os_sdk_conn.block_storage.create_volume( + name=volume_name, + size=1, + image="cirros", + wait=True + ) + if debug1: + print("created volume: " + vol.name + " id: " + vol.id) + if autoclear: + self.volumes_clearing.append(vol.id) + self.os_sdk_conn.block_storage.wait_for_status(vol, status="available") + return vol + + def _create_image_from_volume(self, volume_name, image_name, + autoclear=True): + volume = self._get_volume(volume_name) + self.cc.volumes.upload_to_image( + volume, + False, + image_name, + container_format="bare", + disk_format="raw" + ) + image = self._get_image(image_name) + if debug1: + print("created image: " + image.name + " id: " + image.id) + if autoclear: + self.images_clearing.append(image.id) + return image + + def _delete_volume(self, volume_name, autoclear=True, **kwargs): + vol = self.os_sdk_conn.block_storage.find_volume( + volume_name, + ignore_missing=False + ) + if vol: + self.os_sdk_conn.block_storage.delete_volume(vol.id) + if debug1: + print("deleted volume: " + vol.name + " id: " + vol.id) + if autoclear: + self.volumes_clearing.remove(vol.id) + + def _list_volumes(self): + volumes = self.cc.volumes.list() + return volumes + + def _get_volume(self, volume_name_or_id): + volume = self._find_volume(volume_name_or_id, ignore_missing=False) + return self.os_sdk_conn.block_storage.get_volume(volume) + + def _find_volume(self, volume_name_or_id, ignore_missing=True): + return self.os_sdk_conn.block_storage.find_volume( + volume_name_or_id, + ignore_missing=ignore_missing + ) + + def _update_volume(self, volume_name, **kwargs): + vol = self.os_sdk_conn.update_volume(volume_name, **kwargs) + return vol + + def _get_volume_metadata(self, volume_name): + vol = self.cc.volumes.get(self._get_volume(volume_name).id) + # NOTE(tbrito): cinderclient doesn't call + # /v3/{project_id}/volumes/{volume_id}/metadata explicitly + return vol.metadata + + def _update_volume_metadata(self, volume_name, metadata): + vol = self.cc.volumes.get(self._get_volume(volume_name).id) + # TODO(tbrito): Refactor after + # https://review.opendev.org/c/openstack/openstacksdk/+/777801 merges + return self.cc.volumes.set_metadata(vol, metadata) + + def _delete_volume_metadata(self, volume_name, metadata_keys: list): + vol = self.cc.volumes.get(self._get_volume(volume_name).id) + return self.cc.volumes.delete_metadata(vol, metadata_keys) + + def _set_volume_readonly_flag(self, volume_name, readonly=True): + vol = self.cc.volumes.get(self._get_volume(volume_name).id) + return self.cc.volumes.update_readonly_flag(vol, readonly) + + def _retype_volume(self, volume_name, volume_type, + migration_policy="never"): + vol = self.cc.volumes.get(self._get_volume(volume_name).id) + return self.cc.volumes.retype(vol, volume_type, migration_policy) + + def _extend_volume(self, volume_name, size): + vol = self._get_volume(volume_name) + # NOTE(tbrito): Can't use SDK method to extend because it doesn't + # raise exceptions, only get message + # self.os_sdk_conn.block_storage.extend_volume(vol, size=size) + self.cc.volumes.extend(vol, size) + vol = self.os_sdk_conn.block_storage.get_volume(vol) + self.os_sdk_conn.block_storage.wait_for_status(vol, status="available") + return self._get_volume(volume_name) + + def _add_volume_to_server(self, instance_name_or_id, volume_name_or_id): + instance = self._get_server(instance_name_or_id) + volume = self._get_volume(volume_name_or_id) + self.os_sdk_conn.compute.create_volume_attachment(instance, + volume_id=volume.id) + self.os_sdk_conn.block_storage.wait_for_status(volume, status="in-use") + + def _remove_volume_from_server(self, instance_name_or_id, + volume_name_or_id): + instance = self._get_server(instance_name_or_id) + volume = self._get_volume(volume_name_or_id) + for attached_volume in instance.attached_volumes: + if attached_volume.get("id") == volume.id: + self.os_sdk_conn.compute.delete_volume_attachment( + attached_volume.get("id"), + instance + ) + self.os_sdk_conn.block_store.wait_for_status( + volume, + status='available', + failures=['error'], + wait=6 * 60 + ) + + # ------------------------------------------------------------------------- + # Volume transfer methods + # ------------------------------------------------------------------------- + def _start_volume_transfer(self, volume_name, transfer_name): + volume = self._get_volume(volume_name) + return self.cc.transfers.create(volume.id, transfer_name) + + def _get_volume_transfer(self, transfer_name): + return self.cc.transfers.get( + self.cc.transfers.find(name=transfer_name).id + ) + + def _accept_volume_transfer(self, transfer_id, auth_key): + return self.cc.transfers.accept(transfer_id, auth_key) + + def _list_volume_transfers(self): + return self.cc.transfers.list() + + def _delete_volume_transfer(self, transfer_name): + return self.cc.transfers.delete( + self.cc.transfers.find(name=transfer_name).id + ) + + # ------------------------------------------------------------------------- + # Volume backup methods + # ------------------------------------------------------------------------- + + def _create_volume_backup(self, volume_name, backup_name, autoclear=True): + vol = self._get_volume(volume_name) + bkp = self.os_sdk_conn.block_storage.create_backup( + volume_id=vol.id, + name=backup_name + ) + if debug1: + print("created volume backup: " + bkp.name + " id: " + bkp.id) + if autoclear: + self.volume_bkps_clearing.append(bkp.id) + self.os_sdk_conn.block_storage.wait_for_status(bkp, status="available") + return bkp + + def _delete_volume_backup(self, backup_name, autoclear=True): + bkp = self._get_volume_backup(backup_name) + if bkp: + self.os_sdk_conn.block_storage.delete_backup(bkp.id) + if debug1: + print("deleted volume backup: " + bkp.name + " id: " + bkp.id) + if autoclear: + self.volume_bkps_clearing.remove(bkp.id) + + def _restore_volume_backup(self, backup_name, new_volume_name, + autoclear=True): + bkp = self._get_volume_backup(backup_name) + self.os_sdk_conn.block_storage.restore_backup(bkp, + name=new_volume_name) + bkp = self._get_volume_backup(backup_name) + self.os_sdk_conn.block_storage.wait_for_status(bkp, status="available") + vol = self._get_volume(new_volume_name) + if autoclear: + self.volumes_clearing.append(vol.id) + return vol + + def _get_volume_backup(self, backup_name): + return self.os_sdk_conn.block_storage.get_backup( + self.os_sdk_conn.block_storage.find_backup(backup_name)) + + # ------------------------------------------------------------------------- + # Snapshot methods + # ------------------------------------------------------------------------- + def _create_snapshot(self, volume_name, name, autoclear=True, **kwargs): + volume = self._get_volume(volume_name) + snapshot = self.os_sdk_conn.block_storage.create_snapshot( + volume_id=volume.id, + name=name, + **kwargs + ) + if debug1: + print("created snapshot: " + snapshot.name + " id: " + snapshot.id) + if autoclear: + self.snapshots_clearing.append(snapshot.id) + self.os_sdk_conn.block_storage.wait_for_status( + snapshot, + status="available" + ) + return snapshot + + def _delete_snapshot(self, snapshot_name, autoclear=True): + snapshot = self.os_sdk_conn.block_storage.find_snapshot(snapshot_name) + if snapshot: + self.os_sdk_conn.block_storage.delete_snapshot(snapshot.id) + if debug1: + print("deleted snapshot: " + snapshot.name + " id: " + + snapshot.id) + if autoclear: + self.snapshots_clearing.remove(snapshot.id) + + def _list_snapshots(self): + return self.os_sdk_conn.block_storage.snapshots() + + def _get_snapshot(self, snapshot_name): + return self.os_sdk_conn.block_storage.get_snapshot( + self.os_sdk_conn.block_storage.find_snapshot( + snapshot_name, + ignore_missing=False + ) + ) + + def _update_snapshot(self, snapshot_name, **kwargs): + snapshot = self.cc.volume_snapshots.get( + self._get_snapshot(snapshot_name).id + ) + self.cc.volume_snapshots.update(snapshot, **kwargs) + + def _get_snapshot_metadata(self, snapshot_name): + return self.os_sdk_conn.block_storage.get_snapshot( + self.os_sdk_conn.block_storage.find_snapshot( + snapshot_name, + ignore_missing=False + ) + ).metadata + + def _update_snapshot_metadata(self, snapshot_name, metadata): + snapshot = self.cc.volume_snapshots.get( + self._get_snapshot(snapshot_name).id + ) + self.cc.volume_snapshots.set_metadata(snapshot, metadata) + + def _delete_snapshot_metadata(self, snapshot_name, *metadata_keys): + snapshot = self.cc.volume_snapshots.get( + self._get_snapshot(snapshot_name).id + ) + self.cc.volume_snapshots.delete_metadata(snapshot, metadata_keys) diff --git a/enhanced-policies/tests/rbac_test_base.py b/enhanced-policies/tests/rbac_test_base.py new file mode 100644 index 00000000..787a82c8 --- /dev/null +++ b/enhanced-policies/tests/rbac_test_base.py @@ -0,0 +1,424 @@ +# +# Copyright (c) 2021 Wind River Systems, Inc. +# +# SPDX-License-Identifier: Apache-2.0 +# +# All Rights Reserved. +# + +import os +import unittest + +from cinderclient import client +from glanceclient import Client +from keystoneauth1 import loading +from keystoneauth1 import session +import openstack +import os_client_config +import pytest + +TEST_CLOUD = os.getenv("OS_CLOUD") + + +@pytest.mark.usefixtures("rbac_setup") +class TestClass(unittest.TestCase): + os_sdk_conn = None + + def setUp(self): + print("\nTest initialization") + + self.volumes_cleanup = [] + self.snapshots_cleanup = [] + self.instances_cleanup = [] + self.volume_bkps_cleanup = [] + self.images_cleanup = [] + + self.os_sdk_admin_conn = openstack.connect(cloud=TEST_CLOUD) + + # Create users + self.user11 = { + "name": "user11", + "password": "User11@Project1", + "project": "project1" + } + self.user12 = { + "name": "user12", + "password": "User12@Project1", + "project": "project1" + } + self.user13 = { + "name": "user13", + "password": "User13@Project1", + "project": "project1" + } + self.user21 = { + "name": "user21", + "password": "User21@Project2", + "project": "project2" + } + self.user22 = { + "name": "user22", + "password": "User22@Project2", + "project": "project2" + } + self.user23 = { + "name": "user23", + "password": "User23@Project2", + "project": "project2" + } + + self.users = ( + self.user11, + self.user12, + self.user13, + self.user21, + self.user22, + self.user23 + ) + + # Tear down + def tearDown(self): + for instance_id in self.instances_cleanup: + instance = self.os_sdk_admin_conn.compute.find_server(instance_id) + self.os_sdk_admin_conn.compute.delete_server(instance, force=True) + self.os_sdk_admin_conn.compute.wait_for_delete(instance) + + for image_id in self.images_cleanup: + image = self.os_sdk_admin_conn.image.find_image(image_id) + self.os_sdk_admin_conn.image.delete_image(image) + + for snap_id in self.snapshots_cleanup: + snap = self.os_sdk_admin_conn.block_storage.find_snapshot(snap_id) + self.os_sdk_admin_conn.block_storage.delete_snapshot(snap) + self.os_sdk_admin_conn.block_storage.wait_for_delete(snap) + + for bkp_id in self.volume_bkps_cleanup: + bkp = self.os_sdk_admin_conn.block_storage.find_backup(bkp_id) + self.os_sdk_admin_conn.delete_volume_backup(bkp) + self.os_sdk_admin_conn.block_storage.wait_for_delete(bkp) + + for vol_id in self.volumes_cleanup: + vol = self.os_sdk_admin_conn.block_storage.find_volume(vol_id) + self.os_sdk_admin_conn.delete_volume(vol, force=True) + self.os_sdk_admin_conn.block_storage.wait_for_delete(vol) + + def _get_session_for_user(self, user): + creds = os_client_config.OpenStackConfig()\ + .get_one_cloud(cloud=TEST_CLOUD)\ + .get_auth_args() + loader = loading.get_plugin_loader("password") + auth = loader.load_from_options( + auth_url=self.os_sdk_admin_conn.auth.get("auth_url"), + username=user.get("name"), + password=user.get("password"), + project_name=user.get("project"), + project_domain_name=creds['project_domain_name'], + user_domain_name=creds['user_domain_name'], + ) + return session.Session(auth=auth) + + def _get_conn_for(self, user): + self.sess = self._get_session_for_user(user) + return openstack.connection.Connection(session=self.sess) + + #Cinder + def _get_cclient_for(self, user): + self.sess = self._get_session_for_user(user) + return client.Client('3', session=self.sess, http_log_debug=True) + + #Glance + def _get_gclient_for(self, user): + self.sess = self._get_session_for_user(user) + return Client('2', session=self.sess) + + def set_connections_for_user(self, user): + self.os_sdk_conn = self._get_conn_for(user) + self.cinderclient = self._get_cclient_for(user) + self.gc = self._get_gclient_for(user) + + # Volume methods + def _create_volume(self, volume_name): + vol = self.os_sdk_conn.block_storage.create_volume( + name=volume_name, + size=1, + image="cirros", + wait=True + ) + self.os_sdk_conn.block_storage.wait_for_status(vol, status="available") + self.volumes_cleanup.append(vol.id) + return vol + + def _list_volumes(self): + volumes = self.cinderclient.volumes.list() + return volumes + + def _get_volume(self, volume_name): + volume = self.os_sdk_conn.block_storage.find_volume( + volume_name, + ignore_missing=False + ) + return self.os_sdk_conn.block_storage.get_volume(volume) + + def _update_volume(self, volume_name, **kwargs): + vol = self.os_sdk_conn.update_volume(volume_name, **kwargs) + return vol + + def _get_volume_metadata(self, volume_name): + vol = self.cinderclient.volumes.get(self._get_volume(volume_name).id) + # NOTE(tbrito): cinderclient doesn't call /v3/{project_id}/volumes/{volume_id}/metadata explicitly + return vol.metadata + + def _update_volume_metadata(self, volume_name, metadata): + vol = self.cinderclient.volumes.get(self._get_volume(volume_name).id) + # TODO: Refactor after https://review.opendev.org/c/openstack/openstacksdk/+/777801 merges + return self.cinderclient.volumes.set_metadata(vol, metadata) + + def _delete_volume_metadata(self, volume_name, metadata_keys: list): + vol = self.cinderclient.volumes.get(self._get_volume(volume_name).id) + return self.cinderclient.volumes.delete_metadata(vol, metadata_keys) + + def _set_volume_readonly_flag(self, volume_name, readonly=True): + vol = self.cinderclient.volumes.get(self._get_volume(volume_name).id) + return self.cinderclient.volumes.update_readonly_flag(vol, readonly) + + def _retype_volume(self, volume_name, volume_type, migration_policy="never"): + vol = self.cinderclient.volumes.get(self._get_volume(volume_name).id) + return self.cinderclient.volumes.retype(vol, volume_type, migration_policy) + + def _extend_volume(self, volume_name, size): + vol = self._get_volume(volume_name) + # NOTE(tbrito): Can't use SDK method to extend because it doesn't raise + # exceptions, only get message + # self.os_sdk_conn.block_storage.extend_volume(vol, size=size) + self.cinderclient.volumes.extend(vol, size) + vol = self.os_sdk_conn.block_storage.get_volume(vol) + self.os_sdk_conn.block_storage.wait_for_status(vol, status="available") + return self._get_volume(volume_name) + + def _delete_volume(self, volume_name, **kwargs): + vol = self.os_sdk_conn.block_storage.find_volume(volume_name, ignore_missing=False) + self.os_sdk_conn.block_storage.delete_volume(vol) + self.volumes_cleanup.remove(vol.id) + + # Volume transfer methods + def _start_volume_transfer(self, volume_name, transfer_name): + volume = self._get_volume(volume_name) + return self.cinderclient.transfers.create(volume.id, transfer_name) + + def _get_volume_transfer(self, transfer_name): + return self.cinderclient.transfers.get( + self.cinderclient.transfers.find(name=transfer_name).id + ) + + def _accept_volume_transfer(self, transfer_id, auth_key): + return self.cinderclient.transfers.accept(transfer_id, auth_key) + + def _list_volume_transfers(self): + return self.cinderclient.transfers.list() + + def _delete_volume_transfer(self, transfer_name): + return self.cinderclient.transfers.delete( + self.cinderclient.transfers.find(name=transfer_name).id + ) + + # Volume backup methods + def _create_volume_backup(self, volume_name, backup_name): + vol = self._get_volume(volume_name) + bkp = self.os_sdk_conn.block_storage.create_backup( + volume_id=vol.id, + name=backup_name + ) + self.os_sdk_conn.block_storage.wait_for_status(bkp, status="available") + self.volume_bkps_cleanup.append(bkp.id) + return bkp + + def _get_volume_backup(self, backup_name): + return self.os_sdk_conn.block_storage.get_backup(self.os_sdk_conn.block_storage.find_backup(backup_name)) + + def _restore_volume_backup(self, backup_name, new_volume_name): + bkp = self._get_volume_backup(backup_name) + self.os_sdk_conn.block_storage.restore_backup(bkp, name=new_volume_name) + bkp = self._get_volume_backup(backup_name) + self.os_sdk_conn.block_storage.wait_for_status(bkp, status="available") + volume = self._get_volume(new_volume_name) + self.volumes_cleanup.append(volume.id) + return volume + + def _delete_volume_backup(self, backup_name): + bkp = self._get_volume_backup(backup_name) + self.os_sdk_conn.block_storage.delete_backup(bkp) + self.volume_bkps_cleanup.remove(bkp.id) + + # Server methods + def _create_server(self, server_name, image_name, flavor_name): + image = self.os_sdk_conn.image.find_image(image_name) + flavor = self.os_sdk_conn.compute.find_flavor(flavor_name) + server = self.os_sdk_conn.compute.create_server( + name=server_name, + image_id=image.id, + flavor_id=flavor.id, + networks=[], + autoip=False + ) + self.instances_cleanup.append(server.id) + return self.os_sdk_conn.compute.wait_for_server(server) + + def _update_server(self, server, **kwargs): + self.os_sdk_conn.compute.update_server(server, **kwargs) + + def _get_server(self, server_name): + return self.os_sdk_conn.compute.get_server( + self.os_sdk_conn.compute.find_server(server_name) + ) + + def _add_volume_to_server(self, server_name, volume_name): + server = self._get_server(server_name) + volume = self._get_volume(volume_name) + self.os_sdk_conn.compute.create_volume_attachment( + server, + volume_id=volume.id + ) + self.os_sdk_conn.block_storage.wait_for_status(volume, status="in-use") + + def _remove_volume_from_server(self, volume_name, server_name): + server = self._get_server(server_name) + volume = self._get_volume(volume_name) + for attached_volume in server.attached_volumes: + if attached_volume.get("id") == volume.id: + self.os_sdk_conn.compute.delete_volume_attachment( + attached_volume.get("id"), + server + ) + self.os_sdk_conn.block_store.wait_for_status( + volume, + status='available', + failures=['error'], + wait=360 + ) + + # Snapshot methods + def _create_snapshot(self, volume_name, name, **kwargs): + volume = self._get_volume(volume_name) + snapshot = self.os_sdk_conn.block_storage.create_snapshot( + volume_id=volume.id, + name=name, + **kwargs + ) + self.os_sdk_conn.block_storage.wait_for_status( + snapshot, + status="available" + ) + self.snapshots_cleanup.append(snapshot.id) + return snapshot + + def _list_snapshots(self): + return self.os_sdk_conn.block_storage.snapshots() + + def _get_snapshot(self, snapshot_name): + return self.os_sdk_conn.block_storage.get_snapshot( + self.os_sdk_conn.block_storage.find_snapshot( + snapshot_name, + ignore_missing=False + ) + ) + + def _update_snapshot(self, snapshot_name, **kwargs): + snapshot = self.cinderclient.volume_snapshots.get( + self._get_snapshot(snapshot_name).id + ) + self.cinderclient.volume_snapshots.update(snapshot, **kwargs) + + def _get_snapshot_metadata(self, snapshot_name): + return self.os_sdk_conn.block_storage.get_snapshot( + self.os_sdk_conn.block_storage.find_snapshot( + snapshot_name, + ignore_missing=False + ) + ).metadata + + def _update_snapshot_metadata(self, snapshot_name, metadata): + snapshot = self.cinderclient.volume_snapshots.get( + self._get_snapshot(snapshot_name).id + ) + self.cinderclient.volume_snapshots.set_metadata(snapshot, metadata) + + def _delete_snapshot_metadata(self, snapshot_name, *metadata_keys): + snapshot = self.cinderclient.volume_snapshots.get( + self._get_snapshot(snapshot_name).id + ) + self.cinderclient.volume_snapshots.delete_metadata( + snapshot, + metadata_keys + ) + + def _delete_snapshot(self, snapshot_name): + snapshot = self.os_sdk_conn.block_storage.find_snapshot(snapshot_name) + self.os_sdk_conn.block_storage.delete_snapshot(snapshot) + self.snapshots_cleanup.remove(snapshot.id) + + # Image methods - Glance + def _create_image_from_volume(self, volume_name, image_name): + volume = self._get_volume(volume_name) + self.cinderclient.volumes.upload_to_image( + volume, + False, + image_name, + container_format="bare", + disk_format="raw" + ) + image = self._get_image_by_name(image_name) + self.images_cleanup.append(image.id) + return image + + def _get_image_by_name(self, image_name): + return self.os_sdk_conn.image.get_image( + self.os_sdk_conn.image.find_image(image_name, ignore_missing=False) + ) + + def _get_image_by_id(self, image_id): + return self.os_sdk_conn.image.get_image(image_id) + + def _create_image(self, image_name, filename=None, admin=False, + disk_format="qcow2", container_format="bare", + visibility="private", wait=True, timeout=3*60): + os_sdk_conn = self.os_sdk_conn + if admin: + os_sdk_conn = self.os_sdk_admin_conn + image = os_sdk_conn.image.create_image( + name=image_name, + filename=filename, + container_format=container_format, + disk_format=disk_format, + visibility=visibility, + wait=wait, + timeout=timeout + ) + self.images_cleanup.append(image.id) + return image + + def _upload_image(self, image_id, filename): + image = self.gc.images.upload( + image_id, + open(filename, 'rb') + ) + return image + + def _delete_image(self, image): + self.os_sdk_conn.image.delete_image(image) + self.images_cleanup.remove(image.id) + + def _list_images(self): + return self.gc.images.list() + + def _update_image(self, image, **attrs): + self.os_sdk_conn.image.update_image(image, **attrs) + return self._get_image_by_id(image.id) + + def _download_image(self, image): + return self.os_sdk_conn.image.download_image(image, stream=True) + + def _deactivate_image(self, image): + self.os_sdk_conn.image.deactivate_image(image.id) + + def _reactivate_image(self, image): + self.os_sdk_conn.image.reactivate_image(image.id) diff --git a/enhanced-policies/tests/test_cinder/__init__.py b/enhanced-policies/tests/test_cinder/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/enhanced-policies/tests/test_cinder/test_rbac_cinder.py b/enhanced-policies/tests/test_cinder/test_rbac_cinder.py new file mode 100644 index 00000000..526fc4e6 --- /dev/null +++ b/enhanced-policies/tests/test_cinder/test_rbac_cinder.py @@ -0,0 +1,698 @@ +# +# Copyright (c) 2021 Wind River Systems, Inc. +# +# SPDX-License-Identifier: Apache-2.0 +# +# All Rights Reserved. +# + +from tests import rbac_test_base + +class TestBlockStorage(rbac_test_base.TestClass): + + def test_uc_volume_1(self): + """ + 1. user11 can create a volume from an image + 2. usr11 can list/detail the volume + 3. user11 can create metadata on the volume + 4. user11 can update/delete metadata + 5. user11 cant extend the volume when detached + 6. user11 can update readonly flag of the volume + 7. user11 can retype the volume + 8. user11 can attach/dettach the volume + """ + self.set_connections_for_user(self.user11) + + # 1. user11 can create a volume from an image + self._create_volume("volume11") + # 2. usr11 can list/detail the volume + volumes = self._list_volumes() + self.assertIn("volume11", [v.name for v in volumes]) + + # 3. user11 can create metadata on the volume + self._update_volume_metadata("volume11", metadata={"my": "test"}) + volume11 = self._get_volume("volume11") + self.assertIn("my", volume11.metadata) + self.assertEqual(volume11.metadata.get("my"), "test") + + # 4. user11 can update/delete metadata + self._update_volume_metadata("volume11", metadata={"my": "test2"}) + volume11 = self._get_volume("volume11") + self.assertIn("my", volume11.metadata) + self.assertEqual(volume11.metadata.get("my"), "test2") + + self._delete_volume_metadata("volume11", ["my"]) + volume11 = self._get_volume("volume11") + self.assertNotIn("my", volume11.metadata) + + # 5. user11 cant extend the volume when detached + volume11 = self._extend_volume("volume11", 2) + self.assertEqual(volume11.size, 2) + + # 6. user11 can update readonly flag of the volume + # TODO(tbrito): Fix after merge of https://review.opendev.org/c/openstack/openstacksdk/+/776266 + self._set_volume_readonly_flag("volume11", readonly=True) + volume11 = self._get_volume("volume11") + self.assertTrue(volume11.metadata.get("readonly")) + + # 7. user11 can retype the volume + # TODO(tbrito): Fix after merge of https://review.opendev.org/c/openstack/openstacksdk/+/776272 + self._retype_volume("volume11", volume_type="rbd1") + volume11 = self._get_volume("volume11") + # TODO(tbrito): Req accepted but volume doesn't change? Figure out why + # self.assertEquals(volume11.volume_type, "rbd1") + + # 8. user11 can attach/detach the volume + self._create_server("instance11", image_name="cirros", flavor_name="m1.tiny") + self._add_volume_to_server("instance11", "volume11") + instance11 = self._get_server("instance11") + self.assertIn(volume11.id, [v.get("id") for v in instance11.attached_volumes]) + + self._remove_volume_from_server("volume11", "instance11") + instance11 = self._get_server("instance11") + self.assertEqual(instance11.attached_volumes, []) + + def test_uc_volume_2(self): + """ + 1. user12 can create volume from an image + 2. user12 cannot delete the volume + 3. use12 can list/details the volume it created + 4. user 12 can create metadata of the volumes + 5. user 12 can not update/delete metadata of the volumes + 6. user12 can get list/detail of metadata of volumes of project1 + 7. user12 cannot extend the volume + 8. user12 can attach/detach the volume to an instance + """ + self.set_connections_for_user(self.user12) + + # 1. user12 can create volume form an image + self._create_volume("volume12") + volumes = self._list_volumes() + self.assertIn("volume12", [v.name for v in volumes]) + + # 2. user12 cannot delete the volume + self.assertRaisesRegex( + Exception, + "Policy doesn't allow volume:delete to be performed", + self._delete_volume, + "volume12" + ) + + # 3. use12 can list/details the volume it created + volumes = self._list_volumes() + self.assertIn("volume12", [v.name for v in volumes]) + self._get_volume("volume12") + + # 4. user 12 can create metadata of the volumes + self._update_volume_metadata("volume12", metadata={"my": "test"}) + volume12 = self._get_volume("volume12") + self.assertIn("my", volume12.metadata) + self.assertEqual(volume12.metadata.get("my"), "test") + + # 5. user12 can not update/delete metadata of the volumes + # NOTE(tbrito): cinderclient.set_metadata uses the POST endpoint, so it's not possible to verify that atm + # self.assertRaises( + # Exception, + # self._update_volume_metadata, + # "volume12", + # metadata={"my": "test2"} + # ) + + self.assertRaisesRegex( + Exception, + "Policy doesn't allow volume:delete_volume_metadata to be performed", + self._delete_volume_metadata, + "volume12", + ["my"] + ) + + # 6. user12 can get list/detail of metadata of volumes of project1 + metadata = self._get_volume_metadata("volume12") + self.assertIn("my", volume12.metadata) + self.assertEqual(metadata.get("my"), "test") + + # 7. user12 cannot extend the volume + self.assertRaisesRegex( + Exception, + "Policy doesn't allow volume:extend to be performed", + self._extend_volume, + "volume12", + 2 + ) + + # 8. user12 can attach/detach the volume to an instance + self._create_server("instance12", image_name="cirros", flavor_name="m1.tiny") + self._add_volume_to_server("instance12", "volume12") + instance12 = self._get_server("instance12") + self.assertIn(volume12.id, [v.get("id") for v in instance12.attached_volumes]) + + self._remove_volume_from_server("volume12", "instance12") + instance12 = self._get_server("instance12") + self.assertEqual(instance12.attached_volumes, []) + + def test_uc_volume_3(self): + """ + 1. user13 cannot create/delete/update volumes of project1 + 2. user13 can list/details the volumes of project1 + 3. user13 cannot add/update/delete metadata of volumes + 4. user13 can show metadata of volumes + 5. user13 cannot update readonly flag of the volumes + """ + self.set_connections_for_user(self.user11) + self._create_volume("volume11") + self._update_volume_metadata("volume11", metadata={"my-11": "test-11"}) + self.set_connections_for_user(self.user13) + + # 1. user13 cannot create/delete/update volumes of project1 + self.assertRaisesRegex( + Exception, + "Policy doesn't allow volume:create to be performed", + self._create_volume, + "volume13" + ) + self.assertRaisesRegex( + Exception, + "Policy doesn't allow volume:delete to be performed", + self._delete_volume, + "volume11" + ) + # NOTE(tbrito): cinderclient.set_metadata uses the POST endpoint, so it's not possible to verify that atm + self.assertRaisesRegex( + Exception, + "Policy doesn't allow volume:update to be performed", + self._update_volume, + "volume11", + name="THIS IS VOLUME 13" + ) + + # 2. user13 can list/details the volumes of project1 + volumes = self._list_volumes() + self.assertIn("volume11", [v.name for v in volumes]) + + volume11 = self._get_volume("volume11") + self.assertEqual(volume11.status, "available") + + # 3. user13 cannot add/update/delete metadata of volumes + # NOTE(tbrito): cinderclient.set_metadata uses the POST endpoint, so + # it's not possible to verify that atm + # self.assertRaises( + # Exception, + # self._update_volume_metadata, + # "volume11", + # metadata={"my": "test"} + # ) + + self.assertRaisesRegex( + Exception, + "Policy doesn't allow volume:delete_volume_metadata to be performed", + self._delete_volume_metadata, + "volume11", + ["my-11"] + ) + + # 4. user13 can show metadata of volumes + volume11 = self._get_volume("volume11") + self.assertDictEqual(volume11.metadata, {"my-11": "test-11"}) + + # 5. user13 cannot update readonly flag of the volumes + # TODO(tbrito): Fix after merge of https://review.opendev.org/c/openstack/openstacksdk/+/776266 + self.assertRaisesRegex( + Exception, + "Policy doesn't allow volume:update_readonly_flag to be performed", + self._set_volume_readonly_flag, + "volume11", + readonly=True + ) + + def test_uc_volume_4(self): + """ + user11/12/13 as members of project1, + 1. cannot get list/detail of volumes of project2 + 2. cannot update/delete volumes of project2 + 3. cannot force delete volumes of project2 + """ + self.set_connections_for_user(self.user21) + self._create_volume("volume21") + + for user in (self.user11, self.user12, self.user13): + self.set_connections_for_user(user) + + # 1. cannot get list/detail of volumes of project2, + self.assertNotIn("volume21", [v.name for v in self._list_volumes()]) + + self.assertRaisesRegex( + Exception, + "No Volume found for volume21", + self._get_volume, + "volume21" + ) + + # 2. cannot update/delete volumes of project2 + self.assertRaisesRegex( + Exception, + "No Volume found for volume21", + self._update_volume_metadata, + "volume21", + metadata={"my": "test"} + ) + self.assertRaisesRegex( + Exception, + "No Volume found for volume21", + self._delete_volume, + "volume21" + ) + + # 3. cannot force delete volumes of project2 + self.assertRaisesRegex( + Exception, + "No Volume found for volume21", + self._delete_volume, + "volume21", + force=True + ) + + def test_uc_snapshot_1(self): + """ + 1. user11 create a snapshot of volume with metadata when the voluem is detached. + 2. user11 can list/detail metadata of snapshot of project1 + 3. user11 can update/delete the metadata of snapshot + 4. user11 can detail the snapshot of project1 + 5. user11 can update/delete snapshot + 6. user11 can create a snapshot of the volume when it is attached + """ + self.set_connections_for_user(self.user11) + self._create_volume("volume11") + + # 1. user11 create a snapshot of volume with metadata when the volume is detached. + self._create_snapshot(volume_name="volume11", name="snapshot11", description="snapshot11yeah", + metadata={"my": "test"}) + + # TODO(tbrito): https://review.opendev.org/c/openstack/openstacksdk/+/778757 + # 2. user11 can list/detail metadata of snapshot of project1 + metadata = self._get_snapshot_metadata("snapshot11") + self.assertIn("my", [k for k, v in metadata.items()]) + + # 3. user11 can update/delete the metadata of snapshot + self._update_snapshot_metadata("snapshot11", metadata={"my": "test2"}) + metadata = self._get_snapshot_metadata("snapshot11") + self.assertIn("test2", metadata.get("my")) + self._delete_snapshot_metadata("snapshot11", "my") + metadata = self._get_snapshot_metadata("snapshot11") + self.assertNotIn("my", metadata) + + # 4. user11 can detail the snapshot of project1 + snapshot = self._get_snapshot("snapshot11") + self.assertEqual("snapshot11yeah", snapshot.description) + + # 5. user11 can update/delete snapshot + # TODO(tbrito): + self._update_snapshot("snapshot11", description="My test description") + snapshot = self._get_snapshot("snapshot11") + self.assertEqual("My test description", snapshot.description) + self._delete_snapshot("snapshot11") + + # 6. user11 can create a snapshot of the volume when it is attached + self._create_server("instance11", image_name="cirros", flavor_name="m1.tiny") + self._add_volume_to_server("instance11", "volume11") + self._create_snapshot(volume_name="volume11", name="snapshot11.2", force=True) + + def test_uc_snapshot_2(self): + """ + 1. user12 create a snapshot of volume with metadata when the volume is detached + 2. user12 can list/detail metadata of snapshot of project1 + 3. user12 cannot update/delete the metadata of snapshot + 4. user12 can detail the snapshot of project1 + 5. user12 cannot update/delete snapshot + 6. user12 can create a snapshot of the volume wht it is attached + """ + self.set_connections_for_user(self.user11) + self._create_volume("volume11") + self._create_snapshot(volume_name="volume11", name="snapshot11", description="snapshot11yeah", + metadata={"my": "test"}) + self.set_connections_for_user(self.user12) + self._create_volume("volume12") + + # 1. user12 create a snapshot of volume with metadata when the volume is detached. + self._create_snapshot(volume_name="volume12", name="snapshot12", metadata={"my2": "test2"}) + + # TODO(tbrito): https://review.opendev.org/c/openstack/openstacksdk/+/778757 + # 2. user12 can list/detail metadata of snapshot of project1 + metadata = self._get_snapshot_metadata("snapshot11") + self.assertIn("my", [k for k, v in metadata.items()]) + + # 3. user12 cannot update/delete the metadata of snapshot + self.assertRaisesRegex( + Exception, + "Policy doesn't allow volume:update_snapshot_metadata to be performed", + self._update_snapshot_metadata, + "snapshot11", + metadata={"my": "test2"} + ) + self.assertRaisesRegex( + Exception, + "Policy doesn't allow volume:delete_snapshot_metadata to be performed", + self._delete_snapshot_metadata, + "snapshot11", + "my" + ) + + # 4. user12 can detail the snapshot of project1 + snapshot = self._get_snapshot("snapshot11") + self.assertEqual("snapshot11yeah", snapshot.description) + + # 5. user12 cannot update/delete snapshot + self.assertRaisesRegex( + Exception, + "Policy doesn't allow volume:update_snapshot to be performed", + self._update_snapshot, + "snapshot11", + description="My test description" + ) + self.assertRaisesRegex( + Exception, + "Policy doesn't allow volume:delete_snapshot to be performed", + self._delete_snapshot, + "snapshot11" + ) + + # 6. user12 can create a snapshot of the volume when it is attached + self._create_server("instance12", image_name="cirros", flavor_name="m1.tiny") + # NOTE: user12 cannot create attachment due to os_compute_api:os-volumes-attachmentos_compute_api:os-volumes-attachments:create + # Using user11 instead + self.set_connections_for_user(self.user11) + self._add_volume_to_server("instance12", "volume12") + self.set_connections_for_user(self.user12) + self._create_snapshot(volume_name="volume12", name="snapshot12.2", force=True) + + def test_uc_snapshot_3(self): + """ + 1. user13 cannot create a snapshot of volume with metadata when the volume is detached + 2. user13 can list/detail metadata of snapshot of project1 + 3. user13 cannot update/delete the metadata of snapshot + 4. user13 can detail the snapshot of project1 + 5. user13 cannot update/delete snapshot + 6. user13 cannot create a snapshot of the volume when it is attached + """ + self.set_connections_for_user(self.user11) + self._create_volume("volume11") + self._create_snapshot(volume_name="volume11", name="snapshot11", description="snapshot11yeah", + metadata={"my": "test"}) + + # 1. user13 cannot create a snapshot of volume with metadata when the volume is detached. + self.set_connections_for_user(self.user13) + self.assertRaisesRegex( + Exception, + "Policy doesn't allow volume:create_snapshot to be performed", + self._create_snapshot, + volume_name="volume11", + name="snapshot13", + metadata={"my3": "test3"} + ) + + # TODO(tbrito):https://review.opendev.org/c/openstack/openstacksdk/+/778757 + # 2. user13 can list/detail metadata of snapshot of project1 + metadata = self._get_snapshot_metadata("snapshot11") + self.assertIn("my", metadata) + + # 3. user13 cannot update/delete the metadata of snapshot + self.assertRaisesRegex( + Exception, + "Policy doesn't allow volume:update_snapshot_metadata to be performed", + self._update_snapshot_metadata, + "snapshot11", + metadata={"my": "test2"} + ) + self.assertRaisesRegex( + Exception, + "Policy doesn't allow volume:delete_snapshot_metadata to be performed", + self._delete_snapshot_metadata, + "snapshot11", + "my" + ) + + # 4. user13 can detail the snapshot of project1 + snapshot = self._get_snapshot("snapshot11") + self.assertEqual("snapshot11yeah", snapshot.description) + + # 5. user13 cannot update/delete snapshot + self.assertRaisesRegex( + Exception, + "Policy doesn't allow volume:update_snapshot to be performed", + self._update_snapshot, + "snapshot11", + description="My test description" + ) + self.assertRaisesRegex( + Exception, + "Policy doesn't allow volume:delete_snapshot to be performed", + self._delete_snapshot, + "snapshot11" + ) + + # 6. user13 cannot create a snapshot of the volume when it is attached + self.set_connections_for_user(self.user11) + self._create_server("instance11", image_name="cirros", flavor_name="m1.tiny") + self._add_volume_to_server("instance11", "volume11") + self.set_connections_for_user(self.user13) + self.assertRaisesRegex( + Exception, + "Policy doesn't allow volume:create_snapshot to be performed", + self._create_snapshot, + volume_name="volume11", + name="snapshot13" + ) + + def test_uc_snapshot_4(self): + """ + 1. user21 create a snapshot of a volume of project2 + 2. user11/user12/user13 cannot list/details the volume + """ + # 1. user21 create a snapshot of a volume of project2 + self.set_connections_for_user(self.user21) + self._create_volume("volume21") + self._create_snapshot(volume_name="volume21", name="snapshot21", metadata={"my": "test"}) + + # 2. user11/user12/user13 cannot list/details the volume + for user in (self.user11, self.user12, self.user13): + self.set_connections_for_user(user) + self.assertNotIn("volume21", self._list_snapshots()) + self.assertRaisesRegex( + Exception, + "No Snapshot found for volume21", + self._get_snapshot, + "volume21" + ) + + def test_uc_volumeupload_1(self): + """ + 1. user 11 can upload an image from volume of project1 + 2. user11 can show the new image it uploaded + """ + # 1. user 11 can upload an image from volume of project1 + self.set_connections_for_user(self.user11) + self._create_volume("volume11") + self._create_image_from_volume(volume_name="volume11", image_name="image11") + + # 2. user11 can show the new image it uploaded + self._get_image_by_name("image11") + + def test_uc_volumeupload_2(self): + """ + 1. user12/user13 cannot upload image from volume of project1 + """ + self.set_connections_for_user(self.user11) + self._create_volume("volume11") + self.set_connections_for_user(self.user12) + self.assertRaisesRegex( + Exception, + "Policy doesn't allow volume_extension:volume_actions:upload_image to be performed", + self._create_image_from_volume, + volume_name="volume11", + image_name="image11" + ) + self.set_connections_for_user(self.user13) + self.assertRaisesRegex( + Exception, + "Policy doesn't allow volume_extension:volume_actions:upload_image to be performed", + self._create_image_from_volume, + volume_name="volume11", + image_name="image11" + ) + + def test_uc_volumetransfer_1(self): + """" + 1. user11 can start a volume transfer of volume of project1 + 2. user11 can list/details the transfer it started + 3. user21 can accept the transfer in project2 + 4. user11 can delete the transfer it started + """ + self.set_connections_for_user(self.user11) + self._create_volume("volume11") + + # 1. user11 can start a volume transfer of volume of project1 + original_transfer = self._start_volume_transfer("volume11", "Transfer volume11") + + # 2. user11 can list/details the transfer it started + all_transfers = self._list_volume_transfers() + self.assertIn("Transfer volume11", [t.name for t in all_transfers]) + transfer = self._get_volume_transfer("Transfer volume11") + self.assertEqual("Transfer volume11", transfer.name) + + # 3. user21 can accept the transfer in project2 + self.set_connections_for_user(self.user21) + self._accept_volume_transfer(transfer.id, original_transfer.auth_key) + + # 4. user11 can delete the transfer it started + self.set_connections_for_user(self.user11) + self._create_volume("volume11.2") + self._start_volume_transfer("volume11.2", "Transfer volume11.2") + self._delete_volume_transfer("Transfer volume11.2") + + def test_uc_volumetransfer_2(self): + """ + 1. user12 cannot start a volume transfer of volume of project1 + 2. user21 start a volume transfer in project2 + 3. user12 can list/detail the volume transfer in project1 + 4. user12 can accept the transfer in project1 + """ + self.set_connections_for_user(self.user12) + self._create_volume("volume12") + + # 1. user12 cannot start a volume transfer of volume of project1 + self.assertRaisesRegex( + Exception, + "Policy doesn't allow volume:create_transfer to be performed", + self._start_volume_transfer, + "volume12", + "Transfer volume12" + ) + + # 2. user21 start a volume transfer in project2 + self.set_connections_for_user(self.user21) + self._create_volume("volume21") + transfer21 = self._start_volume_transfer("volume21", "Transfer volume21") + + # 3. user12 can list/detail the volume transfer in project1 + all_transfers = self._list_volume_transfers() + self.assertIn("Transfer volume21", [t.name for t in all_transfers]) + transfer = self._get_volume_transfer("Transfer volume21") + self.assertEqual(transfer.id, transfer21.id) + + # 4. user12 can accept the transfer in project1 + self._accept_volume_transfer(transfer.id, transfer21.auth_key) + + def test_uc_volumetransfer_3(self): + """ + 1. user11 start a volume transfer of volume of project1 + 2. user13 can list/details the transfer it started + 3. user13 cannot start the transfer in project1 + 4. user21 start a volume transfer in project2 + 5. user13 cannot accept the transfer in project1 + """ + self.set_connections_for_user(self.user11) + self._create_volume("volume11") + + # 1. user11 start a volume transfer of volume of project1 + self._start_volume_transfer("volume11", "Transfer volume11") + + # 2. user13 can list/details the transfer it started + self.set_connections_for_user(self.user13) + all_transfers = self._list_volume_transfers() + self.assertIn("Transfer volume11", [t.name for t in all_transfers]) + transfer = self._get_volume_transfer("Transfer volume11") + self.assertEqual("Transfer volume11", transfer.name) + + # 3. user13 cannot start the transfer in project1 + self.assertRaisesRegex( + Exception, + "Policy doesn't allow volume:create_transfer to be performed", + self._start_volume_transfer, + "volume11", + "Another transfer volume11" + ) + + # 4. user21 start a volume transfer in project2 + self.set_connections_for_user(self.user21) + self._create_volume("volume21") + transfer21 = self._start_volume_transfer("volume21", "Transfer volume21") + + # 5. user13 cannot accept the transfer in project1 + self.set_connections_for_user(self.user13) + self.assertRaisesRegex( + Exception, + "Policy doesn't allow volume:accept_transfer to be performed", + self._accept_volume_transfer, + transfer21.id, + transfer21.auth_key + ) + + def test_uc_volumebackup_1(self): + """ + 1. user11/12 can create a volume backup of project1, + 2. user13 cannot create a volume backup of project1, + 3. user11/user12/user13 can list/details the created backup + 4. user 11 can restore the backup + 5. user12 CAN restore the backup (or else nova migration will fail) + 6. user12/user13 cannot restore the backup + 7. user11 can delete the backup + 8. use12/user13 cannot delete the backup + """ + self.set_connections_for_user(self.user11) + self._create_volume("volume11") + + # 1. user11/12 can create a volume backup of project1 + self._create_volume_backup("volume11", "volume11-bkp11") + self.set_connections_for_user(self.user12) + self._create_volume_backup("volume11", "volume11-bkp12") + + # 2. user13 cannot create a volume backup of project1 + self.set_connections_for_user(self.user13) + self.assertRaisesRegex( + Exception, + "Policy doesn't allow backup:create to be performed", + self._create_volume_backup, + "volume11", + "volume11-bkp13" + ) + + # 3. user11/user12/user13 can list/details the created backup + for user in (self.user11, self.user12, self.user13): + self.set_connections_for_user(user) + self._get_volume_backup("volume11-bkp11") + + # 4. user 11 can restore the backup + self.set_connections_for_user(self.user11) + self._restore_volume_backup("volume11-bkp12", "restored-volume11") + self.assertIn("restored-volume11", [v.name for v in self._list_volumes()]) + + # 5. user12 CAN restore the backup (or else nova migration will fail) + self.set_connections_for_user(self.user12) + self.assertRaisesRegex( + Exception, + "Policy doesn't allow backup:restore to be performed", + self._restore_volume_backup, "volume11-bkp12", "restored-volume12" + ) + self.assertNotIn("restored-volume12", [v.name for v in self._list_volumes()]) + + # 6. user13 cannot restore the backup + self.set_connections_for_user(self.user13) + self.assertRaisesRegex( + Exception, + "Policy doesn't allow backup:restore to be performed", + self._restore_volume_backup, "volume11-bkp12", "restored-volume13" + ) + self.assertNotIn("restored-volume13", [v.name for v in self._list_volumes()]) + + # 7. user11 can delete the backup + self.set_connections_for_user(self.user11) + self._delete_volume_backup("volume11-bkp12") + + # 8. use12/user13 cannot delete the backup + for user in (self.user12, self.user13): + self.set_connections_for_user(user) + self.assertRaisesRegex( + Exception, + "Policy doesn't allow backup:delete to be performed", + self._delete_volume_backup, + "volume11-bkp11" + ) diff --git a/enhanced-policies/tox.ini b/enhanced-policies/tox.ini new file mode 100644 index 00000000..eadcc1c8 --- /dev/null +++ b/enhanced-policies/tox.ini @@ -0,0 +1,50 @@ +[tox] +minversion = 3.9.0 +envlist = policy-functional +skipsdist = True +ignore_basepython_conflict = True + +[testenv] +usedevelop = True +skip_install = True +basepython = python3 +setenv = + VIRTUAL_ENV={envdir} + LANG=en_US.UTF-8 + LANGUAGE=en_US:en + LC_ALL=C + OS_LOG_CAPTURE={env:OS_LOG_CAPTURE:false} + OS_STDOUT_CAPTURE={env:OS_STDOUT_CAPTURE:true} + OS_STDERR_CAPTURE={env:OS_STDERR_CAPTURE:true} +deps = + -r{toxinidir}/test-requirements.txt + + +[testenv:policy-functional] +commands = pytest --collect-only tests/ + +[testenv:pep8] +deps = + hacking>=3.1.0,<4.0.0 # Apache-2.0 + flake8-import-order>=0.17.1 # LGPLv3 + pycodestyle>=2.0.0,<2.7.0 # MIT + Pygments>=2.2.0 # BSD + doc8>=0.8.0 # Apache 2.0 +commands = + flake8 {posargs} + doc8 doc/source README.rst + +[flake8] +application-import-names = tests +# The following are ignored on purpose. It's not super worth it to fix them. +# However, if you feel strongly about it, patches will be accepted to fix them +# if they fix ALL of the occurances of one and only one of them. +# H238 New Style Classes are the default in Python3 +# H4 Are about docstrings and there's just a huge pile of pre-existing issues. +# W503 Is supposed to be off by default but in the latest pycodestyle isn't. +# Also, both openstacksdk and Donald Knuth disagree with the rule. Line +# breaks should occur before the binary operator for readability. +ignore = H238,H4,W503 +import-order-style = pep8 +show-source = True +exclude=.venv,.git,.tox,dist,doc,*lib/python*,*egg,build,openstacksdk-0.55.0 \ No newline at end of file