RBAC Patch 2: Cinder tests and common test files
This patch chain aims to suggest a set of default policies for user management on stx-openstack. We suggest the creation of the project_admin and project_readonly roles and provide some policies to fine tune the access control over the Openstack services to those roles, as described on README.md. Also, we provide a set of tests to ensure the policies and permissions are all working as expected on site for the cloud administrators. This commit includes Cinder related tests and functions, along with common test functions used by multiple OpenStack services. Story: 2008910 Task: 42501 Signed-off-by: Heitor Matsui <heitorvieira.matsui@windriver.com> Signed-off-by: Thiago Brito <thiago.brito@windriver.com> Co-authored-by: Miriam Yumi Peixoto <miriam.yumipeixoto@windriver.com> Co-authored-by: Leonardo Zaccarias <leonardo.zaccarias@windriver.com> Co-authored-by: Rogerio Oliveira Ferraz <rogeriooliveira.ferraz@windriver.com> Change-Id: I6b43bc584e470f022fb08a8a4cf741c188dfe80d
This commit is contained in:
parent
963e63cd55
commit
ecb1e24972
15
enhanced-policies/test-requirements.txt
Normal file
15
enhanced-policies/test-requirements.txt
Normal file
@ -0,0 +1,15 @@
|
||||
pytest==6.2.2
|
||||
openstacksdk==0.55.0
|
||||
os-client-config==2.1.0
|
||||
|
||||
#Nova
|
||||
python-novaclient==17.4.0
|
||||
|
||||
#Cinder
|
||||
python-cinderclient==7.3.0
|
||||
|
||||
#Glance
|
||||
python-glanceclient==3.3.0
|
||||
|
||||
# Neutron
|
||||
netaddr
|
0
enhanced-policies/tests/__init__.py
Normal file
0
enhanced-policies/tests/__init__.py
Normal file
147
enhanced-policies/tests/conftest.py
Normal file
147
enhanced-policies/tests/conftest.py
Normal file
@ -0,0 +1,147 @@
|
||||
#
|
||||
# Copyright (c) 2021 Wind River Systems, Inc.
|
||||
#
|
||||
# SPDX-License-Identifier: Apache-2.0
|
||||
#
|
||||
# All Rights Reserved.
|
||||
#
|
||||
|
||||
from pytest import fixture
|
||||
|
||||
from tests.fv_rbac import debug1
|
||||
from tests.fv_rbac import OpenStackNetworkingSetup
|
||||
from tests.fv_rbac import OpenStackTestingSetup
|
||||
|
||||
|
||||
def pytest_addoption(parser):
|
||||
parser.addoption(
|
||||
"--env",
|
||||
action="store",
|
||||
default="stx",
|
||||
help="Environment to run tests against (stx or wro). Default: stx"
|
||||
)
|
||||
|
||||
|
||||
@fixture(scope="session")
|
||||
def env(request):
|
||||
return request.config.getoption("--env")
|
||||
|
||||
|
||||
@fixture(scope='session')
|
||||
def rbac_setup(request):
|
||||
|
||||
if debug1:
|
||||
print("\nSession Initialization")
|
||||
|
||||
cfg = OpenStackTestingSetup()
|
||||
|
||||
# Create projects
|
||||
cfg._create_project("project1", description="project1 for rbac test1")
|
||||
cfg._create_project("project2", description="project2 for rbac test1")
|
||||
|
||||
# NOTE(tbrito): assume roles are already created
|
||||
# Create roles
|
||||
# for role in ["project_readonly", "project_admin"]:
|
||||
# cfg._create_role(role)
|
||||
|
||||
# Create users
|
||||
for user in cfg.users:
|
||||
cfg._create_user(user)
|
||||
|
||||
# Assign Roles to Users
|
||||
cfg._grant_role("project_admin", "user11", "project1")
|
||||
cfg._grant_role("member", "user12", "project1")
|
||||
cfg._grant_role("project_readonly", "user13", "project1")
|
||||
cfg._grant_role("admin", "user02", "project2")
|
||||
cfg._grant_role("project_admin", "user21", "project2")
|
||||
cfg._grant_role("member", "user22", "project2")
|
||||
cfg._grant_role("project_readonly", "user23", "project2")
|
||||
|
||||
image = cfg._create_admin_image()
|
||||
|
||||
def teardown():
|
||||
|
||||
if debug1:
|
||||
print("\nSession Teardown")
|
||||
|
||||
cfg._delete_admin_image(image)
|
||||
|
||||
cfg._revoke_role("project_admin", "user11", "project1")
|
||||
cfg._revoke_role("member", "user12", "project1")
|
||||
cfg._revoke_role("project_readonly", "user13", "project1")
|
||||
cfg._revoke_role("admin", "user02", "project2")
|
||||
cfg._revoke_role("project_admin", "user21", "project2")
|
||||
cfg._revoke_role("member", "user22", "project2")
|
||||
cfg._revoke_role("project_readonly", "user23", "project2")
|
||||
|
||||
for user in cfg.users:
|
||||
cfg._delete_user(user)
|
||||
|
||||
# NOTE(tbrito): Roles should NOT be removed on a live deployment
|
||||
# for role in ["project_readonly", "project_admin"]:
|
||||
# cfg._delete_role(role)
|
||||
|
||||
for project in ["project1", "project2"]:
|
||||
cfg._delete_project(project)
|
||||
|
||||
request.addfinalizer(teardown)
|
||||
|
||||
return cfg
|
||||
|
||||
|
||||
@fixture(scope='session')
|
||||
def network_admin_setup(request, rbac_setup, env):
|
||||
|
||||
cfg = OpenStackNetworkingSetup(env)
|
||||
|
||||
# Create segment ranges based on projects
|
||||
cfg._create_network_segment_range(
|
||||
"group0-ext-r0",
|
||||
shared=True,
|
||||
network_type="vlan",
|
||||
physical_network="group0-data0",
|
||||
minimum=10, maximum=10
|
||||
)
|
||||
|
||||
cfg._create_network_segment_range(
|
||||
"group0-data0-r0",
|
||||
project_name="project1",
|
||||
shared=False,
|
||||
network_type="vlan",
|
||||
physical_network="group0-data0",
|
||||
minimum=400, maximum=499
|
||||
)
|
||||
|
||||
cfg._create_network_segment_range(
|
||||
"group0-data0b-r0",
|
||||
shared=True,
|
||||
network_type="vlan",
|
||||
physical_network="group0-data0",
|
||||
minimum=500, maximum=599
|
||||
)
|
||||
|
||||
cfg._create_network_segment_range(
|
||||
"group0-data1-r0",
|
||||
project="project2",
|
||||
shared=False,
|
||||
network_type="vlan",
|
||||
physical_network="group0-data1",
|
||||
minimum=600, maximum=699
|
||||
)
|
||||
|
||||
if env == "wro":
|
||||
cfg._create_qos("admin-qos", weight=16,
|
||||
description="External Network Policy")
|
||||
|
||||
def network_admin_teardown():
|
||||
cfg._delete_network_segment_range("group0-data1-r0")
|
||||
cfg._delete_network_segment_range("group0-data0b-r0")
|
||||
cfg._delete_network_segment_range("group0-data0-r0")
|
||||
cfg._delete_network_segment_range("group0-ext-r0")
|
||||
|
||||
if env == "wro":
|
||||
cfg._delete_qos("admin-qos")
|
||||
|
||||
request.addfinalizer(network_admin_teardown)
|
||||
|
||||
return cfg
|
636
enhanced-policies/tests/fv_rbac.py
Normal file
636
enhanced-policies/tests/fv_rbac.py
Normal file
@ -0,0 +1,636 @@
|
||||
#
|
||||
# Copyright (c) 2021 Wind River Systems, Inc.
|
||||
#
|
||||
# SPDX-License-Identifier: Apache-2.0
|
||||
#
|
||||
# All Rights Reserved.
|
||||
#
|
||||
|
||||
import os
|
||||
|
||||
from cinderclient import client as CinderClient
|
||||
from glanceclient import Client as GlanceClient
|
||||
from keystoneauth1 import loading
|
||||
from keystoneauth1 import session
|
||||
from novaclient import client as NovaClient
|
||||
import openstack
|
||||
import os_client_config
|
||||
import pytest
|
||||
|
||||
TEST_CLOUD = os.getenv("OS_CLOUD")
|
||||
|
||||
debug = False # For general issues
|
||||
debug1 = True # For teardown issues
|
||||
debug2 = False # For temporary issues
|
||||
|
||||
|
||||
class OpenStackTestingSetup:
|
||||
|
||||
def __init__(self):
|
||||
self.os_sdk_admin_conn = openstack.connect(cloud=TEST_CLOUD)
|
||||
|
||||
self.user11 = {
|
||||
"name": "user11",
|
||||
"password": "User11@Project1",
|
||||
"project": "project1"
|
||||
}
|
||||
self.user12 = {
|
||||
"name": "user12",
|
||||
"password": "User12@Project1",
|
||||
"project": "project1"
|
||||
}
|
||||
self.user13 = {
|
||||
"name": "user13",
|
||||
"password": "User13@Project1",
|
||||
"project": "project1"
|
||||
}
|
||||
self.user02 = {
|
||||
"name": "user02",
|
||||
"password": "user02@Project2",
|
||||
"project": "project2"
|
||||
}
|
||||
self.user21 = {
|
||||
"name": "user21",
|
||||
"password": "User21@Project2",
|
||||
"project": "project2"
|
||||
}
|
||||
self.user22 = {
|
||||
"name": "user22",
|
||||
"password": "User22@Project2",
|
||||
"project": "project2"
|
||||
}
|
||||
self.user23 = {
|
||||
"name": "user23",
|
||||
"password": "User23@Project2",
|
||||
"project": "project2"
|
||||
}
|
||||
|
||||
self.users = (
|
||||
self.user11,
|
||||
self.user12,
|
||||
self.user13,
|
||||
self.user02,
|
||||
self.user21,
|
||||
self.user22,
|
||||
self.user23
|
||||
)
|
||||
|
||||
def _get_project(self, name):
|
||||
return self.os_sdk_admin_conn.get_project(name)
|
||||
|
||||
def _create_project(self, name, description):
|
||||
project = self._get_project(name)
|
||||
if project is None:
|
||||
return self.os_sdk_admin_conn.create_project(
|
||||
name,
|
||||
domain_id="default",
|
||||
description=description
|
||||
)
|
||||
return project
|
||||
|
||||
def _delete_project(self, name):
|
||||
self.os_sdk_admin_conn.delete_project(name)
|
||||
|
||||
def _create_role(self, name):
|
||||
role = self.os_sdk_admin_conn.get_role(name)
|
||||
if role is None:
|
||||
return self.os_sdk_admin_conn.create_role(name)
|
||||
return role
|
||||
|
||||
def _delete_role(self, name):
|
||||
self.os_sdk_admin_conn.delete_role(name)
|
||||
|
||||
def _grant_role(self, name, user_name, project_name):
|
||||
self.os_sdk_admin_conn.grant_role(
|
||||
name,
|
||||
user=user_name,
|
||||
project=project_name
|
||||
)
|
||||
|
||||
def _revoke_role(self, name, user_name, project_name):
|
||||
self.os_sdk_admin_conn.revoke_role(
|
||||
name,
|
||||
user=user_name,
|
||||
project=project_name
|
||||
)
|
||||
|
||||
def _create_user(self, user):
|
||||
user_obj = self.os_sdk_admin_conn.identity.find_user(user.get("name"))
|
||||
if user_obj is None:
|
||||
return self.os_sdk_admin_conn.identity.create_user(
|
||||
name=user.get("name"),
|
||||
password=user.get("password"),
|
||||
default_project=user.get("project"))
|
||||
return user_obj
|
||||
|
||||
def _delete_user(self, user):
|
||||
self.os_sdk_admin_conn.delete_user(user.get("name"))
|
||||
|
||||
def _create_admin_image(self):
|
||||
# wget http://download.cirros-cloud.net/0.3.4/cirros-0.3.4-x86_64-disk.img
|
||||
image = self.os_sdk_admin_conn.image.find_image("cirros")
|
||||
if not image:
|
||||
return self.os_sdk_admin_conn.create_image(
|
||||
"cirros",
|
||||
filename="cirros-0.3.4-x86_64-disk.img",
|
||||
disk_format="qcow2",
|
||||
container_format="bare",
|
||||
wait=True,
|
||||
visibility="public"
|
||||
)
|
||||
return image
|
||||
|
||||
def _delete_admin_image(self, image):
|
||||
if image:
|
||||
self.os_sdk_admin_conn.delete_image(image.id)
|
||||
|
||||
|
||||
class OpenStackNetworkingSetup(OpenStackTestingSetup):
|
||||
|
||||
def __init__(self, env):
|
||||
super(OpenStackNetworkingSetup, self).__init__()
|
||||
self.env = env
|
||||
|
||||
def _create_network_segment_range(self, name, project_name=None, **kwargs):
|
||||
sr = self.os_sdk_admin_conn.network.find_network_segment_range(name)
|
||||
if sr is None:
|
||||
project_id = None
|
||||
if project_name:
|
||||
project_id = self.os_sdk_admin_conn.get_project(
|
||||
project_name).id
|
||||
|
||||
if project_id is None:
|
||||
return self.os_sdk_admin_conn.network. \
|
||||
create_network_segment_range(name=name, **kwargs)
|
||||
else:
|
||||
return self.os_sdk_admin_conn.network. \
|
||||
create_network_segment_range(
|
||||
name=name,
|
||||
project_id=project_id,
|
||||
**kwargs
|
||||
)
|
||||
return sr
|
||||
|
||||
def _delete_network_segment_range(self, name_or_id):
|
||||
return self.os_sdk_admin_conn.network.delete_network(name_or_id)
|
||||
|
||||
|
||||
class OpenStackRouterInterface:
|
||||
def __init__(self, router_name_or_id, subnet_name_or_id):
|
||||
self.router_name_or_id = router_name_or_id
|
||||
self.subnet_name_or_id = subnet_name_or_id
|
||||
|
||||
|
||||
class OpenStackBasicTesting():
|
||||
os_sdk_conn = None
|
||||
os_sdk_admin_conn = None
|
||||
|
||||
# -------------------------------------------------------------------------
|
||||
# Tear down
|
||||
# -------------------------------------------------------------------------
|
||||
|
||||
instances_clearing = list()
|
||||
images_clearing = list()
|
||||
snapshots_clearing = list()
|
||||
volumes_clearing = list()
|
||||
volume_bkps_clearing = list()
|
||||
security_groups_clearing = list()
|
||||
floating_ips_clearing = list()
|
||||
interfaces_clearing = list()
|
||||
routers_clearing = list()
|
||||
trunks_clearing = list()
|
||||
ports_clearing = list()
|
||||
subnets_clearing = list()
|
||||
networks_clearing = list()
|
||||
subnet_pools_clearing = list()
|
||||
address_scopes_clearing = list()
|
||||
|
||||
@pytest.fixture(scope='function')
|
||||
def tc_teardown(self, request):
|
||||
|
||||
def teardown():
|
||||
|
||||
if debug1:
|
||||
print("\nTC Teardown")
|
||||
|
||||
self.os_sdk_conn = self.os_sdk_admin_conn
|
||||
for instance_id in self.instances_clearing:
|
||||
self._delete_server(instance_id, autoclear=False)
|
||||
for image_id in self.images_clearing:
|
||||
self._delete_image(image_id, autoclear=False)
|
||||
for snap_id in self.snapshots_clearing:
|
||||
self._delete_snapshot(snap_id, autoclear=False)
|
||||
for vol_id in self.volumes_clearing:
|
||||
self._delete_volume(vol_id, autoclear=False)
|
||||
for bkp_id in self.volume_bkps_clearing:
|
||||
self._delete_volume_backup(bkp_id, autoclear=False)
|
||||
for sg_id in self.security_groups_clearing:
|
||||
self._delete_security_group(sg_id, autoclear=False)
|
||||
for fip_id in self.floating_ips_clearing:
|
||||
self._delete_floatingip(fip_id, autoclear=False)
|
||||
for ri in self.interfaces_clearing:
|
||||
self._delete_interface_from_router(ri, autoclear=False)
|
||||
for router_id in self.routers_clearing:
|
||||
self._delete_router(router_id, autoclear=False)
|
||||
for trunk_id in self.trunks_clearing:
|
||||
self._delete_trunk(trunk_id, autoclear=False)
|
||||
for port_id in self.ports_clearing:
|
||||
self._delete_port(port_id, autoclear=False)
|
||||
for subnet_id in self.subnets_clearing:
|
||||
self._delete_subnet(subnet_id, autoclear=False)
|
||||
for network_id in self.networks_clearing:
|
||||
self._delete_network(network_id, autoclear=False)
|
||||
for subnet_pool_id in self.subnet_pools_clearing:
|
||||
self._delete_subnetpool(subnet_pool_id, autoclear=False)
|
||||
for address_scope_id in self.address_scopes_clearing:
|
||||
self._delete_addrscope(address_scope_id, autoclear=False)
|
||||
|
||||
self.instances_clearing.clear()
|
||||
self.images_clearing.clear()
|
||||
self.snapshots_clearing.clear()
|
||||
self.volumes_clearing.clear()
|
||||
self.volume_bkps_clearing.clear()
|
||||
self.security_groups_clearing.clear()
|
||||
self.floating_ips_clearing.clear()
|
||||
self.interfaces_clearing.clear()
|
||||
self.routers_clearing.clear()
|
||||
self.trunks_clearing.clear()
|
||||
self.ports_clearing.clear()
|
||||
self.subnets_clearing.clear()
|
||||
self.networks_clearing.clear()
|
||||
self.subnet_pools_clearing.clear()
|
||||
self.address_scopes_clearing.clear()
|
||||
|
||||
request.addfinalizer(teardown)
|
||||
|
||||
return
|
||||
|
||||
@pytest.fixture(scope='class')
|
||||
def create_external_network(self, request):
|
||||
self.set_connections_for_user(self.user02)
|
||||
|
||||
args = {'router:external': True}
|
||||
network = self._create_network(
|
||||
"extnet21",
|
||||
shared=True,
|
||||
autoclear=False,
|
||||
**args)
|
||||
assert network is not None
|
||||
assert "extnet21" in [n.name for n in self._list_networks()]
|
||||
|
||||
subnet = self._create_subnet(
|
||||
"extsubnet21",
|
||||
"extnet21",
|
||||
cidr="192.168.195.0/24",
|
||||
gateway_ip="192.168.195.1", autoclear=False
|
||||
)
|
||||
assert subnet is not None
|
||||
subnet = self._get_subnet("extsubnet21")
|
||||
|
||||
yield network, subnet
|
||||
|
||||
self.set_connections_for_user(self.user02)
|
||||
self._delete_subnet("extsubnet21", autoclear=False)
|
||||
self._delete_network("extnet21", autoclear=False)
|
||||
|
||||
@pytest.fixture(scope='class')
|
||||
def create_router_vr11(self, request, create_external_network):
|
||||
extnet, extsubnet = create_external_network
|
||||
|
||||
self.set_connections_for_user(self.user11)
|
||||
vr11 = self._create_router("vr11", extnet.name, autoclear=False)
|
||||
vr11 = self._get_router("vr11")
|
||||
assert vr11 is not None
|
||||
|
||||
yield vr11
|
||||
|
||||
self.set_connections_for_user(self.user11)
|
||||
self._delete_router("vr11", autoclear=False)
|
||||
|
||||
@pytest.fixture(scope='class')
|
||||
def create_router_vr21(self, request, create_external_network):
|
||||
extnet, extsubnet = create_external_network
|
||||
|
||||
self.set_connections_for_user(self.user02)
|
||||
vr21 = self._create_router("vr21", extnet.name, autoclear=False)
|
||||
vr21 = self._get_router("vr21")
|
||||
assert vr21 is not None
|
||||
|
||||
yield vr21
|
||||
|
||||
self.set_connections_for_user(self.user02)
|
||||
self._delete_router("vr21", autoclear=False)
|
||||
|
||||
def _get_conn_for(self, user):
|
||||
conn = self.os_sdk_admin_conn
|
||||
user_obj = conn.identity.find_user(user.get("name"))
|
||||
project = conn.get_project(user.get("project"))
|
||||
|
||||
return openstack.connection.Connection(
|
||||
auth=dict(
|
||||
auth_url=conn.auth.get("auth_url"),
|
||||
username=user.get("name"),
|
||||
password=user.get("password"),
|
||||
project_id=project.id,
|
||||
user_domain_id=user_obj.domain_id
|
||||
)
|
||||
)
|
||||
|
||||
# -------------------------------------------------------------------------
|
||||
# API Session Connection
|
||||
# -------------------------------------------------------------------------
|
||||
# OpenStack Python API
|
||||
def _get_session_for_user(self, user):
|
||||
creds = os_client_config.OpenStackConfig() \
|
||||
.get_one_cloud(cloud=TEST_CLOUD).get_auth_args()
|
||||
sloader = loading.get_plugin_loader("password")
|
||||
auth = sloader.load_from_options(
|
||||
auth_url=self.os_sdk_admin_conn.auth.get("auth_url"),
|
||||
username=user.get("name"),
|
||||
password=user.get("password"),
|
||||
project_name=user.get("project"),
|
||||
project_domain_name=creds['project_domain_name'],
|
||||
user_domain_name=creds['user_domain_name'],
|
||||
)
|
||||
return session.Session(auth=auth)
|
||||
|
||||
# CinderClient Python API
|
||||
def _get_cclient_for(self, user):
|
||||
self.sess = self._get_session_for_user(user)
|
||||
return CinderClient.Client('3', session=self.sess, http_log_debug=True)
|
||||
|
||||
# GlanceClient Python API
|
||||
def _get_gclient_for(self, user):
|
||||
self.sess = self._get_session_for_user(user)
|
||||
return GlanceClient('2', session=self.sess)
|
||||
|
||||
# NovaClient Python API
|
||||
def _get_nclient_for(self, user):
|
||||
self.sess = self._get_session_for_user(user)
|
||||
return NovaClient.Client('2', session=self.sess)
|
||||
|
||||
def _get_os_conn_for(self, user):
|
||||
self.sess = self._get_session_for_user(user)
|
||||
return openstack.connection.Connection(session=self.sess)
|
||||
|
||||
def set_connections_for_user(self, user):
|
||||
self.os_sdk_conn = self._get_os_conn_for(user)
|
||||
self.nc = self._get_nclient_for(user)
|
||||
self.cc = self._get_cclient_for(user)
|
||||
self.gc = self._get_gclient_for(user)
|
||||
|
||||
# -------------------------------------------------------------------------
|
||||
# Volume methods - Cinder
|
||||
# -------------------------------------------------------------------------
|
||||
|
||||
def _create_volume(self, volume_name, autoclear=True):
|
||||
vol = self.os_sdk_conn.block_storage.create_volume(
|
||||
name=volume_name,
|
||||
size=1,
|
||||
image="cirros",
|
||||
wait=True
|
||||
)
|
||||
if debug1:
|
||||
print("created volume: " + vol.name + " id: " + vol.id)
|
||||
if autoclear:
|
||||
self.volumes_clearing.append(vol.id)
|
||||
self.os_sdk_conn.block_storage.wait_for_status(vol, status="available")
|
||||
return vol
|
||||
|
||||
def _create_image_from_volume(self, volume_name, image_name,
|
||||
autoclear=True):
|
||||
volume = self._get_volume(volume_name)
|
||||
self.cc.volumes.upload_to_image(
|
||||
volume,
|
||||
False,
|
||||
image_name,
|
||||
container_format="bare",
|
||||
disk_format="raw"
|
||||
)
|
||||
image = self._get_image(image_name)
|
||||
if debug1:
|
||||
print("created image: " + image.name + " id: " + image.id)
|
||||
if autoclear:
|
||||
self.images_clearing.append(image.id)
|
||||
return image
|
||||
|
||||
def _delete_volume(self, volume_name, autoclear=True, **kwargs):
|
||||
vol = self.os_sdk_conn.block_storage.find_volume(
|
||||
volume_name,
|
||||
ignore_missing=False
|
||||
)
|
||||
if vol:
|
||||
self.os_sdk_conn.block_storage.delete_volume(vol.id)
|
||||
if debug1:
|
||||
print("deleted volume: " + vol.name + " id: " + vol.id)
|
||||
if autoclear:
|
||||
self.volumes_clearing.remove(vol.id)
|
||||
|
||||
def _list_volumes(self):
|
||||
volumes = self.cc.volumes.list()
|
||||
return volumes
|
||||
|
||||
def _get_volume(self, volume_name_or_id):
|
||||
volume = self._find_volume(volume_name_or_id, ignore_missing=False)
|
||||
return self.os_sdk_conn.block_storage.get_volume(volume)
|
||||
|
||||
def _find_volume(self, volume_name_or_id, ignore_missing=True):
|
||||
return self.os_sdk_conn.block_storage.find_volume(
|
||||
volume_name_or_id,
|
||||
ignore_missing=ignore_missing
|
||||
)
|
||||
|
||||
def _update_volume(self, volume_name, **kwargs):
|
||||
vol = self.os_sdk_conn.update_volume(volume_name, **kwargs)
|
||||
return vol
|
||||
|
||||
def _get_volume_metadata(self, volume_name):
|
||||
vol = self.cc.volumes.get(self._get_volume(volume_name).id)
|
||||
# NOTE(tbrito): cinderclient doesn't call
|
||||
# /v3/{project_id}/volumes/{volume_id}/metadata explicitly
|
||||
return vol.metadata
|
||||
|
||||
def _update_volume_metadata(self, volume_name, metadata):
|
||||
vol = self.cc.volumes.get(self._get_volume(volume_name).id)
|
||||
# TODO(tbrito): Refactor after
|
||||
# https://review.opendev.org/c/openstack/openstacksdk/+/777801 merges
|
||||
return self.cc.volumes.set_metadata(vol, metadata)
|
||||
|
||||
def _delete_volume_metadata(self, volume_name, metadata_keys: list):
|
||||
vol = self.cc.volumes.get(self._get_volume(volume_name).id)
|
||||
return self.cc.volumes.delete_metadata(vol, metadata_keys)
|
||||
|
||||
def _set_volume_readonly_flag(self, volume_name, readonly=True):
|
||||
vol = self.cc.volumes.get(self._get_volume(volume_name).id)
|
||||
return self.cc.volumes.update_readonly_flag(vol, readonly)
|
||||
|
||||
def _retype_volume(self, volume_name, volume_type,
|
||||
migration_policy="never"):
|
||||
vol = self.cc.volumes.get(self._get_volume(volume_name).id)
|
||||
return self.cc.volumes.retype(vol, volume_type, migration_policy)
|
||||
|
||||
def _extend_volume(self, volume_name, size):
|
||||
vol = self._get_volume(volume_name)
|
||||
# NOTE(tbrito): Can't use SDK method to extend because it doesn't
|
||||
# raise exceptions, only get message
|
||||
# self.os_sdk_conn.block_storage.extend_volume(vol, size=size)
|
||||
self.cc.volumes.extend(vol, size)
|
||||
vol = self.os_sdk_conn.block_storage.get_volume(vol)
|
||||
self.os_sdk_conn.block_storage.wait_for_status(vol, status="available")
|
||||
return self._get_volume(volume_name)
|
||||
|
||||
def _add_volume_to_server(self, instance_name_or_id, volume_name_or_id):
|
||||
instance = self._get_server(instance_name_or_id)
|
||||
volume = self._get_volume(volume_name_or_id)
|
||||
self.os_sdk_conn.compute.create_volume_attachment(instance,
|
||||
volume_id=volume.id)
|
||||
self.os_sdk_conn.block_storage.wait_for_status(volume, status="in-use")
|
||||
|
||||
def _remove_volume_from_server(self, instance_name_or_id,
|
||||
volume_name_or_id):
|
||||
instance = self._get_server(instance_name_or_id)
|
||||
volume = self._get_volume(volume_name_or_id)
|
||||
for attached_volume in instance.attached_volumes:
|
||||
if attached_volume.get("id") == volume.id:
|
||||
self.os_sdk_conn.compute.delete_volume_attachment(
|
||||
attached_volume.get("id"),
|
||||
instance
|
||||
)
|
||||
self.os_sdk_conn.block_store.wait_for_status(
|
||||
volume,
|
||||
status='available',
|
||||
failures=['error'],
|
||||
wait=6 * 60
|
||||
)
|
||||
|
||||
# -------------------------------------------------------------------------
|
||||
# Volume transfer methods
|
||||
# -------------------------------------------------------------------------
|
||||
def _start_volume_transfer(self, volume_name, transfer_name):
|
||||
volume = self._get_volume(volume_name)
|
||||
return self.cc.transfers.create(volume.id, transfer_name)
|
||||
|
||||
def _get_volume_transfer(self, transfer_name):
|
||||
return self.cc.transfers.get(
|
||||
self.cc.transfers.find(name=transfer_name).id
|
||||
)
|
||||
|
||||
def _accept_volume_transfer(self, transfer_id, auth_key):
|
||||
return self.cc.transfers.accept(transfer_id, auth_key)
|
||||
|
||||
def _list_volume_transfers(self):
|
||||
return self.cc.transfers.list()
|
||||
|
||||
def _delete_volume_transfer(self, transfer_name):
|
||||
return self.cc.transfers.delete(
|
||||
self.cc.transfers.find(name=transfer_name).id
|
||||
)
|
||||
|
||||
# -------------------------------------------------------------------------
|
||||
# Volume backup methods
|
||||
# -------------------------------------------------------------------------
|
||||
|
||||
def _create_volume_backup(self, volume_name, backup_name, autoclear=True):
|
||||
vol = self._get_volume(volume_name)
|
||||
bkp = self.os_sdk_conn.block_storage.create_backup(
|
||||
volume_id=vol.id,
|
||||
name=backup_name
|
||||
)
|
||||
if debug1:
|
||||
print("created volume backup: " + bkp.name + " id: " + bkp.id)
|
||||
if autoclear:
|
||||
self.volume_bkps_clearing.append(bkp.id)
|
||||
self.os_sdk_conn.block_storage.wait_for_status(bkp, status="available")
|
||||
return bkp
|
||||
|
||||
def _delete_volume_backup(self, backup_name, autoclear=True):
|
||||
bkp = self._get_volume_backup(backup_name)
|
||||
if bkp:
|
||||
self.os_sdk_conn.block_storage.delete_backup(bkp.id)
|
||||
if debug1:
|
||||
print("deleted volume backup: " + bkp.name + " id: " + bkp.id)
|
||||
if autoclear:
|
||||
self.volume_bkps_clearing.remove(bkp.id)
|
||||
|
||||
def _restore_volume_backup(self, backup_name, new_volume_name,
|
||||
autoclear=True):
|
||||
bkp = self._get_volume_backup(backup_name)
|
||||
self.os_sdk_conn.block_storage.restore_backup(bkp,
|
||||
name=new_volume_name)
|
||||
bkp = self._get_volume_backup(backup_name)
|
||||
self.os_sdk_conn.block_storage.wait_for_status(bkp, status="available")
|
||||
vol = self._get_volume(new_volume_name)
|
||||
if autoclear:
|
||||
self.volumes_clearing.append(vol.id)
|
||||
return vol
|
||||
|
||||
def _get_volume_backup(self, backup_name):
|
||||
return self.os_sdk_conn.block_storage.get_backup(
|
||||
self.os_sdk_conn.block_storage.find_backup(backup_name))
|
||||
|
||||
# -------------------------------------------------------------------------
|
||||
# Snapshot methods
|
||||
# -------------------------------------------------------------------------
|
||||
def _create_snapshot(self, volume_name, name, autoclear=True, **kwargs):
|
||||
volume = self._get_volume(volume_name)
|
||||
snapshot = self.os_sdk_conn.block_storage.create_snapshot(
|
||||
volume_id=volume.id,
|
||||
name=name,
|
||||
**kwargs
|
||||
)
|
||||
if debug1:
|
||||
print("created snapshot: " + snapshot.name + " id: " + snapshot.id)
|
||||
if autoclear:
|
||||
self.snapshots_clearing.append(snapshot.id)
|
||||
self.os_sdk_conn.block_storage.wait_for_status(
|
||||
snapshot,
|
||||
status="available"
|
||||
)
|
||||
return snapshot
|
||||
|
||||
def _delete_snapshot(self, snapshot_name, autoclear=True):
|
||||
snapshot = self.os_sdk_conn.block_storage.find_snapshot(snapshot_name)
|
||||
if snapshot:
|
||||
self.os_sdk_conn.block_storage.delete_snapshot(snapshot.id)
|
||||
if debug1:
|
||||
print("deleted snapshot: " + snapshot.name + " id: " +
|
||||
snapshot.id)
|
||||
if autoclear:
|
||||
self.snapshots_clearing.remove(snapshot.id)
|
||||
|
||||
def _list_snapshots(self):
|
||||
return self.os_sdk_conn.block_storage.snapshots()
|
||||
|
||||
def _get_snapshot(self, snapshot_name):
|
||||
return self.os_sdk_conn.block_storage.get_snapshot(
|
||||
self.os_sdk_conn.block_storage.find_snapshot(
|
||||
snapshot_name,
|
||||
ignore_missing=False
|
||||
)
|
||||
)
|
||||
|
||||
def _update_snapshot(self, snapshot_name, **kwargs):
|
||||
snapshot = self.cc.volume_snapshots.get(
|
||||
self._get_snapshot(snapshot_name).id
|
||||
)
|
||||
self.cc.volume_snapshots.update(snapshot, **kwargs)
|
||||
|
||||
def _get_snapshot_metadata(self, snapshot_name):
|
||||
return self.os_sdk_conn.block_storage.get_snapshot(
|
||||
self.os_sdk_conn.block_storage.find_snapshot(
|
||||
snapshot_name,
|
||||
ignore_missing=False
|
||||
)
|
||||
).metadata
|
||||
|
||||
def _update_snapshot_metadata(self, snapshot_name, metadata):
|
||||
snapshot = self.cc.volume_snapshots.get(
|
||||
self._get_snapshot(snapshot_name).id
|
||||
)
|
||||
self.cc.volume_snapshots.set_metadata(snapshot, metadata)
|
||||
|
||||
def _delete_snapshot_metadata(self, snapshot_name, *metadata_keys):
|
||||
snapshot = self.cc.volume_snapshots.get(
|
||||
self._get_snapshot(snapshot_name).id
|
||||
)
|
||||
self.cc.volume_snapshots.delete_metadata(snapshot, metadata_keys)
|
424
enhanced-policies/tests/rbac_test_base.py
Normal file
424
enhanced-policies/tests/rbac_test_base.py
Normal file
@ -0,0 +1,424 @@
|
||||
#
|
||||
# Copyright (c) 2021 Wind River Systems, Inc.
|
||||
#
|
||||
# SPDX-License-Identifier: Apache-2.0
|
||||
#
|
||||
# All Rights Reserved.
|
||||
#
|
||||
|
||||
import os
|
||||
import unittest
|
||||
|
||||
from cinderclient import client
|
||||
from glanceclient import Client
|
||||
from keystoneauth1 import loading
|
||||
from keystoneauth1 import session
|
||||
import openstack
|
||||
import os_client_config
|
||||
import pytest
|
||||
|
||||
TEST_CLOUD = os.getenv("OS_CLOUD")
|
||||
|
||||
|
||||
@pytest.mark.usefixtures("rbac_setup")
|
||||
class TestClass(unittest.TestCase):
|
||||
os_sdk_conn = None
|
||||
|
||||
def setUp(self):
|
||||
print("\nTest initialization")
|
||||
|
||||
self.volumes_cleanup = []
|
||||
self.snapshots_cleanup = []
|
||||
self.instances_cleanup = []
|
||||
self.volume_bkps_cleanup = []
|
||||
self.images_cleanup = []
|
||||
|
||||
self.os_sdk_admin_conn = openstack.connect(cloud=TEST_CLOUD)
|
||||
|
||||
# Create users
|
||||
self.user11 = {
|
||||
"name": "user11",
|
||||
"password": "User11@Project1",
|
||||
"project": "project1"
|
||||
}
|
||||
self.user12 = {
|
||||
"name": "user12",
|
||||
"password": "User12@Project1",
|
||||
"project": "project1"
|
||||
}
|
||||
self.user13 = {
|
||||
"name": "user13",
|
||||
"password": "User13@Project1",
|
||||
"project": "project1"
|
||||
}
|
||||
self.user21 = {
|
||||
"name": "user21",
|
||||
"password": "User21@Project2",
|
||||
"project": "project2"
|
||||
}
|
||||
self.user22 = {
|
||||
"name": "user22",
|
||||
"password": "User22@Project2",
|
||||
"project": "project2"
|
||||
}
|
||||
self.user23 = {
|
||||
"name": "user23",
|
||||
"password": "User23@Project2",
|
||||
"project": "project2"
|
||||
}
|
||||
|
||||
self.users = (
|
||||
self.user11,
|
||||
self.user12,
|
||||
self.user13,
|
||||
self.user21,
|
||||
self.user22,
|
||||
self.user23
|
||||
)
|
||||
|
||||
# Tear down
|
||||
def tearDown(self):
|
||||
for instance_id in self.instances_cleanup:
|
||||
instance = self.os_sdk_admin_conn.compute.find_server(instance_id)
|
||||
self.os_sdk_admin_conn.compute.delete_server(instance, force=True)
|
||||
self.os_sdk_admin_conn.compute.wait_for_delete(instance)
|
||||
|
||||
for image_id in self.images_cleanup:
|
||||
image = self.os_sdk_admin_conn.image.find_image(image_id)
|
||||
self.os_sdk_admin_conn.image.delete_image(image)
|
||||
|
||||
for snap_id in self.snapshots_cleanup:
|
||||
snap = self.os_sdk_admin_conn.block_storage.find_snapshot(snap_id)
|
||||
self.os_sdk_admin_conn.block_storage.delete_snapshot(snap)
|
||||
self.os_sdk_admin_conn.block_storage.wait_for_delete(snap)
|
||||
|
||||
for bkp_id in self.volume_bkps_cleanup:
|
||||
bkp = self.os_sdk_admin_conn.block_storage.find_backup(bkp_id)
|
||||
self.os_sdk_admin_conn.delete_volume_backup(bkp)
|
||||
self.os_sdk_admin_conn.block_storage.wait_for_delete(bkp)
|
||||
|
||||
for vol_id in self.volumes_cleanup:
|
||||
vol = self.os_sdk_admin_conn.block_storage.find_volume(vol_id)
|
||||
self.os_sdk_admin_conn.delete_volume(vol, force=True)
|
||||
self.os_sdk_admin_conn.block_storage.wait_for_delete(vol)
|
||||
|
||||
def _get_session_for_user(self, user):
|
||||
creds = os_client_config.OpenStackConfig()\
|
||||
.get_one_cloud(cloud=TEST_CLOUD)\
|
||||
.get_auth_args()
|
||||
loader = loading.get_plugin_loader("password")
|
||||
auth = loader.load_from_options(
|
||||
auth_url=self.os_sdk_admin_conn.auth.get("auth_url"),
|
||||
username=user.get("name"),
|
||||
password=user.get("password"),
|
||||
project_name=user.get("project"),
|
||||
project_domain_name=creds['project_domain_name'],
|
||||
user_domain_name=creds['user_domain_name'],
|
||||
)
|
||||
return session.Session(auth=auth)
|
||||
|
||||
def _get_conn_for(self, user):
|
||||
self.sess = self._get_session_for_user(user)
|
||||
return openstack.connection.Connection(session=self.sess)
|
||||
|
||||
#Cinder
|
||||
def _get_cclient_for(self, user):
|
||||
self.sess = self._get_session_for_user(user)
|
||||
return client.Client('3', session=self.sess, http_log_debug=True)
|
||||
|
||||
#Glance
|
||||
def _get_gclient_for(self, user):
|
||||
self.sess = self._get_session_for_user(user)
|
||||
return Client('2', session=self.sess)
|
||||
|
||||
def set_connections_for_user(self, user):
|
||||
self.os_sdk_conn = self._get_conn_for(user)
|
||||
self.cinderclient = self._get_cclient_for(user)
|
||||
self.gc = self._get_gclient_for(user)
|
||||
|
||||
# Volume methods
|
||||
def _create_volume(self, volume_name):
|
||||
vol = self.os_sdk_conn.block_storage.create_volume(
|
||||
name=volume_name,
|
||||
size=1,
|
||||
image="cirros",
|
||||
wait=True
|
||||
)
|
||||
self.os_sdk_conn.block_storage.wait_for_status(vol, status="available")
|
||||
self.volumes_cleanup.append(vol.id)
|
||||
return vol
|
||||
|
||||
def _list_volumes(self):
|
||||
volumes = self.cinderclient.volumes.list()
|
||||
return volumes
|
||||
|
||||
def _get_volume(self, volume_name):
|
||||
volume = self.os_sdk_conn.block_storage.find_volume(
|
||||
volume_name,
|
||||
ignore_missing=False
|
||||
)
|
||||
return self.os_sdk_conn.block_storage.get_volume(volume)
|
||||
|
||||
def _update_volume(self, volume_name, **kwargs):
|
||||
vol = self.os_sdk_conn.update_volume(volume_name, **kwargs)
|
||||
return vol
|
||||
|
||||
def _get_volume_metadata(self, volume_name):
|
||||
vol = self.cinderclient.volumes.get(self._get_volume(volume_name).id)
|
||||
# NOTE(tbrito): cinderclient doesn't call /v3/{project_id}/volumes/{volume_id}/metadata explicitly
|
||||
return vol.metadata
|
||||
|
||||
def _update_volume_metadata(self, volume_name, metadata):
|
||||
vol = self.cinderclient.volumes.get(self._get_volume(volume_name).id)
|
||||
# TODO: Refactor after https://review.opendev.org/c/openstack/openstacksdk/+/777801 merges
|
||||
return self.cinderclient.volumes.set_metadata(vol, metadata)
|
||||
|
||||
def _delete_volume_metadata(self, volume_name, metadata_keys: list):
|
||||
vol = self.cinderclient.volumes.get(self._get_volume(volume_name).id)
|
||||
return self.cinderclient.volumes.delete_metadata(vol, metadata_keys)
|
||||
|
||||
def _set_volume_readonly_flag(self, volume_name, readonly=True):
|
||||
vol = self.cinderclient.volumes.get(self._get_volume(volume_name).id)
|
||||
return self.cinderclient.volumes.update_readonly_flag(vol, readonly)
|
||||
|
||||
def _retype_volume(self, volume_name, volume_type, migration_policy="never"):
|
||||
vol = self.cinderclient.volumes.get(self._get_volume(volume_name).id)
|
||||
return self.cinderclient.volumes.retype(vol, volume_type, migration_policy)
|
||||
|
||||
def _extend_volume(self, volume_name, size):
|
||||
vol = self._get_volume(volume_name)
|
||||
# NOTE(tbrito): Can't use SDK method to extend because it doesn't raise
|
||||
# exceptions, only get message
|
||||
# self.os_sdk_conn.block_storage.extend_volume(vol, size=size)
|
||||
self.cinderclient.volumes.extend(vol, size)
|
||||
vol = self.os_sdk_conn.block_storage.get_volume(vol)
|
||||
self.os_sdk_conn.block_storage.wait_for_status(vol, status="available")
|
||||
return self._get_volume(volume_name)
|
||||
|
||||
def _delete_volume(self, volume_name, **kwargs):
|
||||
vol = self.os_sdk_conn.block_storage.find_volume(volume_name, ignore_missing=False)
|
||||
self.os_sdk_conn.block_storage.delete_volume(vol)
|
||||
self.volumes_cleanup.remove(vol.id)
|
||||
|
||||
# Volume transfer methods
|
||||
def _start_volume_transfer(self, volume_name, transfer_name):
|
||||
volume = self._get_volume(volume_name)
|
||||
return self.cinderclient.transfers.create(volume.id, transfer_name)
|
||||
|
||||
def _get_volume_transfer(self, transfer_name):
|
||||
return self.cinderclient.transfers.get(
|
||||
self.cinderclient.transfers.find(name=transfer_name).id
|
||||
)
|
||||
|
||||
def _accept_volume_transfer(self, transfer_id, auth_key):
|
||||
return self.cinderclient.transfers.accept(transfer_id, auth_key)
|
||||
|
||||
def _list_volume_transfers(self):
|
||||
return self.cinderclient.transfers.list()
|
||||
|
||||
def _delete_volume_transfer(self, transfer_name):
|
||||
return self.cinderclient.transfers.delete(
|
||||
self.cinderclient.transfers.find(name=transfer_name).id
|
||||
)
|
||||
|
||||
# Volume backup methods
|
||||
def _create_volume_backup(self, volume_name, backup_name):
|
||||
vol = self._get_volume(volume_name)
|
||||
bkp = self.os_sdk_conn.block_storage.create_backup(
|
||||
volume_id=vol.id,
|
||||
name=backup_name
|
||||
)
|
||||
self.os_sdk_conn.block_storage.wait_for_status(bkp, status="available")
|
||||
self.volume_bkps_cleanup.append(bkp.id)
|
||||
return bkp
|
||||
|
||||
def _get_volume_backup(self, backup_name):
|
||||
return self.os_sdk_conn.block_storage.get_backup(self.os_sdk_conn.block_storage.find_backup(backup_name))
|
||||
|
||||
def _restore_volume_backup(self, backup_name, new_volume_name):
|
||||
bkp = self._get_volume_backup(backup_name)
|
||||
self.os_sdk_conn.block_storage.restore_backup(bkp, name=new_volume_name)
|
||||
bkp = self._get_volume_backup(backup_name)
|
||||
self.os_sdk_conn.block_storage.wait_for_status(bkp, status="available")
|
||||
volume = self._get_volume(new_volume_name)
|
||||
self.volumes_cleanup.append(volume.id)
|
||||
return volume
|
||||
|
||||
def _delete_volume_backup(self, backup_name):
|
||||
bkp = self._get_volume_backup(backup_name)
|
||||
self.os_sdk_conn.block_storage.delete_backup(bkp)
|
||||
self.volume_bkps_cleanup.remove(bkp.id)
|
||||
|
||||
# Server methods
|
||||
def _create_server(self, server_name, image_name, flavor_name):
|
||||
image = self.os_sdk_conn.image.find_image(image_name)
|
||||
flavor = self.os_sdk_conn.compute.find_flavor(flavor_name)
|
||||
server = self.os_sdk_conn.compute.create_server(
|
||||
name=server_name,
|
||||
image_id=image.id,
|
||||
flavor_id=flavor.id,
|
||||
networks=[],
|
||||
autoip=False
|
||||
)
|
||||
self.instances_cleanup.append(server.id)
|
||||
return self.os_sdk_conn.compute.wait_for_server(server)
|
||||
|
||||
def _update_server(self, server, **kwargs):
|
||||
self.os_sdk_conn.compute.update_server(server, **kwargs)
|
||||
|
||||
def _get_server(self, server_name):
|
||||
return self.os_sdk_conn.compute.get_server(
|
||||
self.os_sdk_conn.compute.find_server(server_name)
|
||||
)
|
||||
|
||||
def _add_volume_to_server(self, server_name, volume_name):
|
||||
server = self._get_server(server_name)
|
||||
volume = self._get_volume(volume_name)
|
||||
self.os_sdk_conn.compute.create_volume_attachment(
|
||||
server,
|
||||
volume_id=volume.id
|
||||
)
|
||||
self.os_sdk_conn.block_storage.wait_for_status(volume, status="in-use")
|
||||
|
||||
def _remove_volume_from_server(self, volume_name, server_name):
|
||||
server = self._get_server(server_name)
|
||||
volume = self._get_volume(volume_name)
|
||||
for attached_volume in server.attached_volumes:
|
||||
if attached_volume.get("id") == volume.id:
|
||||
self.os_sdk_conn.compute.delete_volume_attachment(
|
||||
attached_volume.get("id"),
|
||||
server
|
||||
)
|
||||
self.os_sdk_conn.block_store.wait_for_status(
|
||||
volume,
|
||||
status='available',
|
||||
failures=['error'],
|
||||
wait=360
|
||||
)
|
||||
|
||||
# Snapshot methods
|
||||
def _create_snapshot(self, volume_name, name, **kwargs):
|
||||
volume = self._get_volume(volume_name)
|
||||
snapshot = self.os_sdk_conn.block_storage.create_snapshot(
|
||||
volume_id=volume.id,
|
||||
name=name,
|
||||
**kwargs
|
||||
)
|
||||
self.os_sdk_conn.block_storage.wait_for_status(
|
||||
snapshot,
|
||||
status="available"
|
||||
)
|
||||
self.snapshots_cleanup.append(snapshot.id)
|
||||
return snapshot
|
||||
|
||||
def _list_snapshots(self):
|
||||
return self.os_sdk_conn.block_storage.snapshots()
|
||||
|
||||
def _get_snapshot(self, snapshot_name):
|
||||
return self.os_sdk_conn.block_storage.get_snapshot(
|
||||
self.os_sdk_conn.block_storage.find_snapshot(
|
||||
snapshot_name,
|
||||
ignore_missing=False
|
||||
)
|
||||
)
|
||||
|
||||
def _update_snapshot(self, snapshot_name, **kwargs):
|
||||
snapshot = self.cinderclient.volume_snapshots.get(
|
||||
self._get_snapshot(snapshot_name).id
|
||||
)
|
||||
self.cinderclient.volume_snapshots.update(snapshot, **kwargs)
|
||||
|
||||
def _get_snapshot_metadata(self, snapshot_name):
|
||||
return self.os_sdk_conn.block_storage.get_snapshot(
|
||||
self.os_sdk_conn.block_storage.find_snapshot(
|
||||
snapshot_name,
|
||||
ignore_missing=False
|
||||
)
|
||||
).metadata
|
||||
|
||||
def _update_snapshot_metadata(self, snapshot_name, metadata):
|
||||
snapshot = self.cinderclient.volume_snapshots.get(
|
||||
self._get_snapshot(snapshot_name).id
|
||||
)
|
||||
self.cinderclient.volume_snapshots.set_metadata(snapshot, metadata)
|
||||
|
||||
def _delete_snapshot_metadata(self, snapshot_name, *metadata_keys):
|
||||
snapshot = self.cinderclient.volume_snapshots.get(
|
||||
self._get_snapshot(snapshot_name).id
|
||||
)
|
||||
self.cinderclient.volume_snapshots.delete_metadata(
|
||||
snapshot,
|
||||
metadata_keys
|
||||
)
|
||||
|
||||
def _delete_snapshot(self, snapshot_name):
|
||||
snapshot = self.os_sdk_conn.block_storage.find_snapshot(snapshot_name)
|
||||
self.os_sdk_conn.block_storage.delete_snapshot(snapshot)
|
||||
self.snapshots_cleanup.remove(snapshot.id)
|
||||
|
||||
# Image methods - Glance
|
||||
def _create_image_from_volume(self, volume_name, image_name):
|
||||
volume = self._get_volume(volume_name)
|
||||
self.cinderclient.volumes.upload_to_image(
|
||||
volume,
|
||||
False,
|
||||
image_name,
|
||||
container_format="bare",
|
||||
disk_format="raw"
|
||||
)
|
||||
image = self._get_image_by_name(image_name)
|
||||
self.images_cleanup.append(image.id)
|
||||
return image
|
||||
|
||||
def _get_image_by_name(self, image_name):
|
||||
return self.os_sdk_conn.image.get_image(
|
||||
self.os_sdk_conn.image.find_image(image_name, ignore_missing=False)
|
||||
)
|
||||
|
||||
def _get_image_by_id(self, image_id):
|
||||
return self.os_sdk_conn.image.get_image(image_id)
|
||||
|
||||
def _create_image(self, image_name, filename=None, admin=False,
|
||||
disk_format="qcow2", container_format="bare",
|
||||
visibility="private", wait=True, timeout=3*60):
|
||||
os_sdk_conn = self.os_sdk_conn
|
||||
if admin:
|
||||
os_sdk_conn = self.os_sdk_admin_conn
|
||||
image = os_sdk_conn.image.create_image(
|
||||
name=image_name,
|
||||
filename=filename,
|
||||
container_format=container_format,
|
||||
disk_format=disk_format,
|
||||
visibility=visibility,
|
||||
wait=wait,
|
||||
timeout=timeout
|
||||
)
|
||||
self.images_cleanup.append(image.id)
|
||||
return image
|
||||
|
||||
def _upload_image(self, image_id, filename):
|
||||
image = self.gc.images.upload(
|
||||
image_id,
|
||||
open(filename, 'rb')
|
||||
)
|
||||
return image
|
||||
|
||||
def _delete_image(self, image):
|
||||
self.os_sdk_conn.image.delete_image(image)
|
||||
self.images_cleanup.remove(image.id)
|
||||
|
||||
def _list_images(self):
|
||||
return self.gc.images.list()
|
||||
|
||||
def _update_image(self, image, **attrs):
|
||||
self.os_sdk_conn.image.update_image(image, **attrs)
|
||||
return self._get_image_by_id(image.id)
|
||||
|
||||
def _download_image(self, image):
|
||||
return self.os_sdk_conn.image.download_image(image, stream=True)
|
||||
|
||||
def _deactivate_image(self, image):
|
||||
self.os_sdk_conn.image.deactivate_image(image.id)
|
||||
|
||||
def _reactivate_image(self, image):
|
||||
self.os_sdk_conn.image.reactivate_image(image.id)
|
0
enhanced-policies/tests/test_cinder/__init__.py
Normal file
0
enhanced-policies/tests/test_cinder/__init__.py
Normal file
698
enhanced-policies/tests/test_cinder/test_rbac_cinder.py
Normal file
698
enhanced-policies/tests/test_cinder/test_rbac_cinder.py
Normal file
@ -0,0 +1,698 @@
|
||||
#
|
||||
# Copyright (c) 2021 Wind River Systems, Inc.
|
||||
#
|
||||
# SPDX-License-Identifier: Apache-2.0
|
||||
#
|
||||
# All Rights Reserved.
|
||||
#
|
||||
|
||||
from tests import rbac_test_base
|
||||
|
||||
class TestBlockStorage(rbac_test_base.TestClass):
|
||||
|
||||
def test_uc_volume_1(self):
|
||||
"""
|
||||
1. user11 can create a volume from an image
|
||||
2. usr11 can list/detail the volume
|
||||
3. user11 can create metadata on the volume
|
||||
4. user11 can update/delete metadata
|
||||
5. user11 cant extend the volume when detached
|
||||
6. user11 can update readonly flag of the volume
|
||||
7. user11 can retype the volume
|
||||
8. user11 can attach/dettach the volume
|
||||
"""
|
||||
self.set_connections_for_user(self.user11)
|
||||
|
||||
# 1. user11 can create a volume from an image
|
||||
self._create_volume("volume11")
|
||||
# 2. usr11 can list/detail the volume
|
||||
volumes = self._list_volumes()
|
||||
self.assertIn("volume11", [v.name for v in volumes])
|
||||
|
||||
# 3. user11 can create metadata on the volume
|
||||
self._update_volume_metadata("volume11", metadata={"my": "test"})
|
||||
volume11 = self._get_volume("volume11")
|
||||
self.assertIn("my", volume11.metadata)
|
||||
self.assertEqual(volume11.metadata.get("my"), "test")
|
||||
|
||||
# 4. user11 can update/delete metadata
|
||||
self._update_volume_metadata("volume11", metadata={"my": "test2"})
|
||||
volume11 = self._get_volume("volume11")
|
||||
self.assertIn("my", volume11.metadata)
|
||||
self.assertEqual(volume11.metadata.get("my"), "test2")
|
||||
|
||||
self._delete_volume_metadata("volume11", ["my"])
|
||||
volume11 = self._get_volume("volume11")
|
||||
self.assertNotIn("my", volume11.metadata)
|
||||
|
||||
# 5. user11 cant extend the volume when detached
|
||||
volume11 = self._extend_volume("volume11", 2)
|
||||
self.assertEqual(volume11.size, 2)
|
||||
|
||||
# 6. user11 can update readonly flag of the volume
|
||||
# TODO(tbrito): Fix after merge of https://review.opendev.org/c/openstack/openstacksdk/+/776266
|
||||
self._set_volume_readonly_flag("volume11", readonly=True)
|
||||
volume11 = self._get_volume("volume11")
|
||||
self.assertTrue(volume11.metadata.get("readonly"))
|
||||
|
||||
# 7. user11 can retype the volume
|
||||
# TODO(tbrito): Fix after merge of https://review.opendev.org/c/openstack/openstacksdk/+/776272
|
||||
self._retype_volume("volume11", volume_type="rbd1")
|
||||
volume11 = self._get_volume("volume11")
|
||||
# TODO(tbrito): Req accepted but volume doesn't change? Figure out why
|
||||
# self.assertEquals(volume11.volume_type, "rbd1")
|
||||
|
||||
# 8. user11 can attach/detach the volume
|
||||
self._create_server("instance11", image_name="cirros", flavor_name="m1.tiny")
|
||||
self._add_volume_to_server("instance11", "volume11")
|
||||
instance11 = self._get_server("instance11")
|
||||
self.assertIn(volume11.id, [v.get("id") for v in instance11.attached_volumes])
|
||||
|
||||
self._remove_volume_from_server("volume11", "instance11")
|
||||
instance11 = self._get_server("instance11")
|
||||
self.assertEqual(instance11.attached_volumes, [])
|
||||
|
||||
def test_uc_volume_2(self):
|
||||
"""
|
||||
1. user12 can create volume from an image
|
||||
2. user12 cannot delete the volume
|
||||
3. use12 can list/details the volume it created
|
||||
4. user 12 can create metadata of the volumes
|
||||
5. user 12 can not update/delete metadata of the volumes
|
||||
6. user12 can get list/detail of metadata of volumes of project1
|
||||
7. user12 cannot extend the volume
|
||||
8. user12 can attach/detach the volume to an instance
|
||||
"""
|
||||
self.set_connections_for_user(self.user12)
|
||||
|
||||
# 1. user12 can create volume form an image
|
||||
self._create_volume("volume12")
|
||||
volumes = self._list_volumes()
|
||||
self.assertIn("volume12", [v.name for v in volumes])
|
||||
|
||||
# 2. user12 cannot delete the volume
|
||||
self.assertRaisesRegex(
|
||||
Exception,
|
||||
"Policy doesn't allow volume:delete to be performed",
|
||||
self._delete_volume,
|
||||
"volume12"
|
||||
)
|
||||
|
||||
# 3. use12 can list/details the volume it created
|
||||
volumes = self._list_volumes()
|
||||
self.assertIn("volume12", [v.name for v in volumes])
|
||||
self._get_volume("volume12")
|
||||
|
||||
# 4. user 12 can create metadata of the volumes
|
||||
self._update_volume_metadata("volume12", metadata={"my": "test"})
|
||||
volume12 = self._get_volume("volume12")
|
||||
self.assertIn("my", volume12.metadata)
|
||||
self.assertEqual(volume12.metadata.get("my"), "test")
|
||||
|
||||
# 5. user12 can not update/delete metadata of the volumes
|
||||
# NOTE(tbrito): cinderclient.set_metadata uses the POST endpoint, so it's not possible to verify that atm
|
||||
# self.assertRaises(
|
||||
# Exception,
|
||||
# self._update_volume_metadata,
|
||||
# "volume12",
|
||||
# metadata={"my": "test2"}
|
||||
# )
|
||||
|
||||
self.assertRaisesRegex(
|
||||
Exception,
|
||||
"Policy doesn't allow volume:delete_volume_metadata to be performed",
|
||||
self._delete_volume_metadata,
|
||||
"volume12",
|
||||
["my"]
|
||||
)
|
||||
|
||||
# 6. user12 can get list/detail of metadata of volumes of project1
|
||||
metadata = self._get_volume_metadata("volume12")
|
||||
self.assertIn("my", volume12.metadata)
|
||||
self.assertEqual(metadata.get("my"), "test")
|
||||
|
||||
# 7. user12 cannot extend the volume
|
||||
self.assertRaisesRegex(
|
||||
Exception,
|
||||
"Policy doesn't allow volume:extend to be performed",
|
||||
self._extend_volume,
|
||||
"volume12",
|
||||
2
|
||||
)
|
||||
|
||||
# 8. user12 can attach/detach the volume to an instance
|
||||
self._create_server("instance12", image_name="cirros", flavor_name="m1.tiny")
|
||||
self._add_volume_to_server("instance12", "volume12")
|
||||
instance12 = self._get_server("instance12")
|
||||
self.assertIn(volume12.id, [v.get("id") for v in instance12.attached_volumes])
|
||||
|
||||
self._remove_volume_from_server("volume12", "instance12")
|
||||
instance12 = self._get_server("instance12")
|
||||
self.assertEqual(instance12.attached_volumes, [])
|
||||
|
||||
def test_uc_volume_3(self):
|
||||
"""
|
||||
1. user13 cannot create/delete/update volumes of project1
|
||||
2. user13 can list/details the volumes of project1
|
||||
3. user13 cannot add/update/delete metadata of volumes
|
||||
4. user13 can show metadata of volumes
|
||||
5. user13 cannot update readonly flag of the volumes
|
||||
"""
|
||||
self.set_connections_for_user(self.user11)
|
||||
self._create_volume("volume11")
|
||||
self._update_volume_metadata("volume11", metadata={"my-11": "test-11"})
|
||||
self.set_connections_for_user(self.user13)
|
||||
|
||||
# 1. user13 cannot create/delete/update volumes of project1
|
||||
self.assertRaisesRegex(
|
||||
Exception,
|
||||
"Policy doesn't allow volume:create to be performed",
|
||||
self._create_volume,
|
||||
"volume13"
|
||||
)
|
||||
self.assertRaisesRegex(
|
||||
Exception,
|
||||
"Policy doesn't allow volume:delete to be performed",
|
||||
self._delete_volume,
|
||||
"volume11"
|
||||
)
|
||||
# NOTE(tbrito): cinderclient.set_metadata uses the POST endpoint, so it's not possible to verify that atm
|
||||
self.assertRaisesRegex(
|
||||
Exception,
|
||||
"Policy doesn't allow volume:update to be performed",
|
||||
self._update_volume,
|
||||
"volume11",
|
||||
name="THIS IS VOLUME 13"
|
||||
)
|
||||
|
||||
# 2. user13 can list/details the volumes of project1
|
||||
volumes = self._list_volumes()
|
||||
self.assertIn("volume11", [v.name for v in volumes])
|
||||
|
||||
volume11 = self._get_volume("volume11")
|
||||
self.assertEqual(volume11.status, "available")
|
||||
|
||||
# 3. user13 cannot add/update/delete metadata of volumes
|
||||
# NOTE(tbrito): cinderclient.set_metadata uses the POST endpoint, so
|
||||
# it's not possible to verify that atm
|
||||
# self.assertRaises(
|
||||
# Exception,
|
||||
# self._update_volume_metadata,
|
||||
# "volume11",
|
||||
# metadata={"my": "test"}
|
||||
# )
|
||||
|
||||
self.assertRaisesRegex(
|
||||
Exception,
|
||||
"Policy doesn't allow volume:delete_volume_metadata to be performed",
|
||||
self._delete_volume_metadata,
|
||||
"volume11",
|
||||
["my-11"]
|
||||
)
|
||||
|
||||
# 4. user13 can show metadata of volumes
|
||||
volume11 = self._get_volume("volume11")
|
||||
self.assertDictEqual(volume11.metadata, {"my-11": "test-11"})
|
||||
|
||||
# 5. user13 cannot update readonly flag of the volumes
|
||||
# TODO(tbrito): Fix after merge of https://review.opendev.org/c/openstack/openstacksdk/+/776266
|
||||
self.assertRaisesRegex(
|
||||
Exception,
|
||||
"Policy doesn't allow volume:update_readonly_flag to be performed",
|
||||
self._set_volume_readonly_flag,
|
||||
"volume11",
|
||||
readonly=True
|
||||
)
|
||||
|
||||
def test_uc_volume_4(self):
|
||||
"""
|
||||
user11/12/13 as members of project1,
|
||||
1. cannot get list/detail of volumes of project2
|
||||
2. cannot update/delete volumes of project2
|
||||
3. cannot force delete volumes of project2
|
||||
"""
|
||||
self.set_connections_for_user(self.user21)
|
||||
self._create_volume("volume21")
|
||||
|
||||
for user in (self.user11, self.user12, self.user13):
|
||||
self.set_connections_for_user(user)
|
||||
|
||||
# 1. cannot get list/detail of volumes of project2,
|
||||
self.assertNotIn("volume21", [v.name for v in self._list_volumes()])
|
||||
|
||||
self.assertRaisesRegex(
|
||||
Exception,
|
||||
"No Volume found for volume21",
|
||||
self._get_volume,
|
||||
"volume21"
|
||||
)
|
||||
|
||||
# 2. cannot update/delete volumes of project2
|
||||
self.assertRaisesRegex(
|
||||
Exception,
|
||||
"No Volume found for volume21",
|
||||
self._update_volume_metadata,
|
||||
"volume21",
|
||||
metadata={"my": "test"}
|
||||
)
|
||||
self.assertRaisesRegex(
|
||||
Exception,
|
||||
"No Volume found for volume21",
|
||||
self._delete_volume,
|
||||
"volume21"
|
||||
)
|
||||
|
||||
# 3. cannot force delete volumes of project2
|
||||
self.assertRaisesRegex(
|
||||
Exception,
|
||||
"No Volume found for volume21",
|
||||
self._delete_volume,
|
||||
"volume21",
|
||||
force=True
|
||||
)
|
||||
|
||||
def test_uc_snapshot_1(self):
|
||||
"""
|
||||
1. user11 create a snapshot of volume with metadata when the voluem is detached.
|
||||
2. user11 can list/detail metadata of snapshot of project1
|
||||
3. user11 can update/delete the metadata of snapshot
|
||||
4. user11 can detail the snapshot of project1
|
||||
5. user11 can update/delete snapshot
|
||||
6. user11 can create a snapshot of the volume when it is attached
|
||||
"""
|
||||
self.set_connections_for_user(self.user11)
|
||||
self._create_volume("volume11")
|
||||
|
||||
# 1. user11 create a snapshot of volume with metadata when the volume is detached.
|
||||
self._create_snapshot(volume_name="volume11", name="snapshot11", description="snapshot11yeah",
|
||||
metadata={"my": "test"})
|
||||
|
||||
# TODO(tbrito): https://review.opendev.org/c/openstack/openstacksdk/+/778757
|
||||
# 2. user11 can list/detail metadata of snapshot of project1
|
||||
metadata = self._get_snapshot_metadata("snapshot11")
|
||||
self.assertIn("my", [k for k, v in metadata.items()])
|
||||
|
||||
# 3. user11 can update/delete the metadata of snapshot
|
||||
self._update_snapshot_metadata("snapshot11", metadata={"my": "test2"})
|
||||
metadata = self._get_snapshot_metadata("snapshot11")
|
||||
self.assertIn("test2", metadata.get("my"))
|
||||
self._delete_snapshot_metadata("snapshot11", "my")
|
||||
metadata = self._get_snapshot_metadata("snapshot11")
|
||||
self.assertNotIn("my", metadata)
|
||||
|
||||
# 4. user11 can detail the snapshot of project1
|
||||
snapshot = self._get_snapshot("snapshot11")
|
||||
self.assertEqual("snapshot11yeah", snapshot.description)
|
||||
|
||||
# 5. user11 can update/delete snapshot
|
||||
# TODO(tbrito):
|
||||
self._update_snapshot("snapshot11", description="My test description")
|
||||
snapshot = self._get_snapshot("snapshot11")
|
||||
self.assertEqual("My test description", snapshot.description)
|
||||
self._delete_snapshot("snapshot11")
|
||||
|
||||
# 6. user11 can create a snapshot of the volume when it is attached
|
||||
self._create_server("instance11", image_name="cirros", flavor_name="m1.tiny")
|
||||
self._add_volume_to_server("instance11", "volume11")
|
||||
self._create_snapshot(volume_name="volume11", name="snapshot11.2", force=True)
|
||||
|
||||
def test_uc_snapshot_2(self):
|
||||
"""
|
||||
1. user12 create a snapshot of volume with metadata when the volume is detached
|
||||
2. user12 can list/detail metadata of snapshot of project1
|
||||
3. user12 cannot update/delete the metadata of snapshot
|
||||
4. user12 can detail the snapshot of project1
|
||||
5. user12 cannot update/delete snapshot
|
||||
6. user12 can create a snapshot of the volume wht it is attached
|
||||
"""
|
||||
self.set_connections_for_user(self.user11)
|
||||
self._create_volume("volume11")
|
||||
self._create_snapshot(volume_name="volume11", name="snapshot11", description="snapshot11yeah",
|
||||
metadata={"my": "test"})
|
||||
self.set_connections_for_user(self.user12)
|
||||
self._create_volume("volume12")
|
||||
|
||||
# 1. user12 create a snapshot of volume with metadata when the volume is detached.
|
||||
self._create_snapshot(volume_name="volume12", name="snapshot12", metadata={"my2": "test2"})
|
||||
|
||||
# TODO(tbrito): https://review.opendev.org/c/openstack/openstacksdk/+/778757
|
||||
# 2. user12 can list/detail metadata of snapshot of project1
|
||||
metadata = self._get_snapshot_metadata("snapshot11")
|
||||
self.assertIn("my", [k for k, v in metadata.items()])
|
||||
|
||||
# 3. user12 cannot update/delete the metadata of snapshot
|
||||
self.assertRaisesRegex(
|
||||
Exception,
|
||||
"Policy doesn't allow volume:update_snapshot_metadata to be performed",
|
||||
self._update_snapshot_metadata,
|
||||
"snapshot11",
|
||||
metadata={"my": "test2"}
|
||||
)
|
||||
self.assertRaisesRegex(
|
||||
Exception,
|
||||
"Policy doesn't allow volume:delete_snapshot_metadata to be performed",
|
||||
self._delete_snapshot_metadata,
|
||||
"snapshot11",
|
||||
"my"
|
||||
)
|
||||
|
||||
# 4. user12 can detail the snapshot of project1
|
||||
snapshot = self._get_snapshot("snapshot11")
|
||||
self.assertEqual("snapshot11yeah", snapshot.description)
|
||||
|
||||
# 5. user12 cannot update/delete snapshot
|
||||
self.assertRaisesRegex(
|
||||
Exception,
|
||||
"Policy doesn't allow volume:update_snapshot to be performed",
|
||||
self._update_snapshot,
|
||||
"snapshot11",
|
||||
description="My test description"
|
||||
)
|
||||
self.assertRaisesRegex(
|
||||
Exception,
|
||||
"Policy doesn't allow volume:delete_snapshot to be performed",
|
||||
self._delete_snapshot,
|
||||
"snapshot11"
|
||||
)
|
||||
|
||||
# 6. user12 can create a snapshot of the volume when it is attached
|
||||
self._create_server("instance12", image_name="cirros", flavor_name="m1.tiny")
|
||||
# NOTE: user12 cannot create attachment due to os_compute_api:os-volumes-attachmentos_compute_api:os-volumes-attachments:create
|
||||
# Using user11 instead
|
||||
self.set_connections_for_user(self.user11)
|
||||
self._add_volume_to_server("instance12", "volume12")
|
||||
self.set_connections_for_user(self.user12)
|
||||
self._create_snapshot(volume_name="volume12", name="snapshot12.2", force=True)
|
||||
|
||||
def test_uc_snapshot_3(self):
|
||||
"""
|
||||
1. user13 cannot create a snapshot of volume with metadata when the volume is detached
|
||||
2. user13 can list/detail metadata of snapshot of project1
|
||||
3. user13 cannot update/delete the metadata of snapshot
|
||||
4. user13 can detail the snapshot of project1
|
||||
5. user13 cannot update/delete snapshot
|
||||
6. user13 cannot create a snapshot of the volume when it is attached
|
||||
"""
|
||||
self.set_connections_for_user(self.user11)
|
||||
self._create_volume("volume11")
|
||||
self._create_snapshot(volume_name="volume11", name="snapshot11", description="snapshot11yeah",
|
||||
metadata={"my": "test"})
|
||||
|
||||
# 1. user13 cannot create a snapshot of volume with metadata when the volume is detached.
|
||||
self.set_connections_for_user(self.user13)
|
||||
self.assertRaisesRegex(
|
||||
Exception,
|
||||
"Policy doesn't allow volume:create_snapshot to be performed",
|
||||
self._create_snapshot,
|
||||
volume_name="volume11",
|
||||
name="snapshot13",
|
||||
metadata={"my3": "test3"}
|
||||
)
|
||||
|
||||
# TODO(tbrito):https://review.opendev.org/c/openstack/openstacksdk/+/778757
|
||||
# 2. user13 can list/detail metadata of snapshot of project1
|
||||
metadata = self._get_snapshot_metadata("snapshot11")
|
||||
self.assertIn("my", metadata)
|
||||
|
||||
# 3. user13 cannot update/delete the metadata of snapshot
|
||||
self.assertRaisesRegex(
|
||||
Exception,
|
||||
"Policy doesn't allow volume:update_snapshot_metadata to be performed",
|
||||
self._update_snapshot_metadata,
|
||||
"snapshot11",
|
||||
metadata={"my": "test2"}
|
||||
)
|
||||
self.assertRaisesRegex(
|
||||
Exception,
|
||||
"Policy doesn't allow volume:delete_snapshot_metadata to be performed",
|
||||
self._delete_snapshot_metadata,
|
||||
"snapshot11",
|
||||
"my"
|
||||
)
|
||||
|
||||
# 4. user13 can detail the snapshot of project1
|
||||
snapshot = self._get_snapshot("snapshot11")
|
||||
self.assertEqual("snapshot11yeah", snapshot.description)
|
||||
|
||||
# 5. user13 cannot update/delete snapshot
|
||||
self.assertRaisesRegex(
|
||||
Exception,
|
||||
"Policy doesn't allow volume:update_snapshot to be performed",
|
||||
self._update_snapshot,
|
||||
"snapshot11",
|
||||
description="My test description"
|
||||
)
|
||||
self.assertRaisesRegex(
|
||||
Exception,
|
||||
"Policy doesn't allow volume:delete_snapshot to be performed",
|
||||
self._delete_snapshot,
|
||||
"snapshot11"
|
||||
)
|
||||
|
||||
# 6. user13 cannot create a snapshot of the volume when it is attached
|
||||
self.set_connections_for_user(self.user11)
|
||||
self._create_server("instance11", image_name="cirros", flavor_name="m1.tiny")
|
||||
self._add_volume_to_server("instance11", "volume11")
|
||||
self.set_connections_for_user(self.user13)
|
||||
self.assertRaisesRegex(
|
||||
Exception,
|
||||
"Policy doesn't allow volume:create_snapshot to be performed",
|
||||
self._create_snapshot,
|
||||
volume_name="volume11",
|
||||
name="snapshot13"
|
||||
)
|
||||
|
||||
def test_uc_snapshot_4(self):
|
||||
"""
|
||||
1. user21 create a snapshot of a volume of project2
|
||||
2. user11/user12/user13 cannot list/details the volume
|
||||
"""
|
||||
# 1. user21 create a snapshot of a volume of project2
|
||||
self.set_connections_for_user(self.user21)
|
||||
self._create_volume("volume21")
|
||||
self._create_snapshot(volume_name="volume21", name="snapshot21", metadata={"my": "test"})
|
||||
|
||||
# 2. user11/user12/user13 cannot list/details the volume
|
||||
for user in (self.user11, self.user12, self.user13):
|
||||
self.set_connections_for_user(user)
|
||||
self.assertNotIn("volume21", self._list_snapshots())
|
||||
self.assertRaisesRegex(
|
||||
Exception,
|
||||
"No Snapshot found for volume21",
|
||||
self._get_snapshot,
|
||||
"volume21"
|
||||
)
|
||||
|
||||
def test_uc_volumeupload_1(self):
|
||||
"""
|
||||
1. user 11 can upload an image from volume of project1
|
||||
2. user11 can show the new image it uploaded
|
||||
"""
|
||||
# 1. user 11 can upload an image from volume of project1
|
||||
self.set_connections_for_user(self.user11)
|
||||
self._create_volume("volume11")
|
||||
self._create_image_from_volume(volume_name="volume11", image_name="image11")
|
||||
|
||||
# 2. user11 can show the new image it uploaded
|
||||
self._get_image_by_name("image11")
|
||||
|
||||
def test_uc_volumeupload_2(self):
|
||||
"""
|
||||
1. user12/user13 cannot upload image from volume of project1
|
||||
"""
|
||||
self.set_connections_for_user(self.user11)
|
||||
self._create_volume("volume11")
|
||||
self.set_connections_for_user(self.user12)
|
||||
self.assertRaisesRegex(
|
||||
Exception,
|
||||
"Policy doesn't allow volume_extension:volume_actions:upload_image to be performed",
|
||||
self._create_image_from_volume,
|
||||
volume_name="volume11",
|
||||
image_name="image11"
|
||||
)
|
||||
self.set_connections_for_user(self.user13)
|
||||
self.assertRaisesRegex(
|
||||
Exception,
|
||||
"Policy doesn't allow volume_extension:volume_actions:upload_image to be performed",
|
||||
self._create_image_from_volume,
|
||||
volume_name="volume11",
|
||||
image_name="image11"
|
||||
)
|
||||
|
||||
def test_uc_volumetransfer_1(self):
|
||||
""""
|
||||
1. user11 can start a volume transfer of volume of project1
|
||||
2. user11 can list/details the transfer it started
|
||||
3. user21 can accept the transfer in project2
|
||||
4. user11 can delete the transfer it started
|
||||
"""
|
||||
self.set_connections_for_user(self.user11)
|
||||
self._create_volume("volume11")
|
||||
|
||||
# 1. user11 can start a volume transfer of volume of project1
|
||||
original_transfer = self._start_volume_transfer("volume11", "Transfer volume11")
|
||||
|
||||
# 2. user11 can list/details the transfer it started
|
||||
all_transfers = self._list_volume_transfers()
|
||||
self.assertIn("Transfer volume11", [t.name for t in all_transfers])
|
||||
transfer = self._get_volume_transfer("Transfer volume11")
|
||||
self.assertEqual("Transfer volume11", transfer.name)
|
||||
|
||||
# 3. user21 can accept the transfer in project2
|
||||
self.set_connections_for_user(self.user21)
|
||||
self._accept_volume_transfer(transfer.id, original_transfer.auth_key)
|
||||
|
||||
# 4. user11 can delete the transfer it started
|
||||
self.set_connections_for_user(self.user11)
|
||||
self._create_volume("volume11.2")
|
||||
self._start_volume_transfer("volume11.2", "Transfer volume11.2")
|
||||
self._delete_volume_transfer("Transfer volume11.2")
|
||||
|
||||
def test_uc_volumetransfer_2(self):
|
||||
"""
|
||||
1. user12 cannot start a volume transfer of volume of project1
|
||||
2. user21 start a volume transfer in project2
|
||||
3. user12 can list/detail the volume transfer in project1
|
||||
4. user12 can accept the transfer in project1
|
||||
"""
|
||||
self.set_connections_for_user(self.user12)
|
||||
self._create_volume("volume12")
|
||||
|
||||
# 1. user12 cannot start a volume transfer of volume of project1
|
||||
self.assertRaisesRegex(
|
||||
Exception,
|
||||
"Policy doesn't allow volume:create_transfer to be performed",
|
||||
self._start_volume_transfer,
|
||||
"volume12",
|
||||
"Transfer volume12"
|
||||
)
|
||||
|
||||
# 2. user21 start a volume transfer in project2
|
||||
self.set_connections_for_user(self.user21)
|
||||
self._create_volume("volume21")
|
||||
transfer21 = self._start_volume_transfer("volume21", "Transfer volume21")
|
||||
|
||||
# 3. user12 can list/detail the volume transfer in project1
|
||||
all_transfers = self._list_volume_transfers()
|
||||
self.assertIn("Transfer volume21", [t.name for t in all_transfers])
|
||||
transfer = self._get_volume_transfer("Transfer volume21")
|
||||
self.assertEqual(transfer.id, transfer21.id)
|
||||
|
||||
# 4. user12 can accept the transfer in project1
|
||||
self._accept_volume_transfer(transfer.id, transfer21.auth_key)
|
||||
|
||||
def test_uc_volumetransfer_3(self):
|
||||
"""
|
||||
1. user11 start a volume transfer of volume of project1
|
||||
2. user13 can list/details the transfer it started
|
||||
3. user13 cannot start the transfer in project1
|
||||
4. user21 start a volume transfer in project2
|
||||
5. user13 cannot accept the transfer in project1
|
||||
"""
|
||||
self.set_connections_for_user(self.user11)
|
||||
self._create_volume("volume11")
|
||||
|
||||
# 1. user11 start a volume transfer of volume of project1
|
||||
self._start_volume_transfer("volume11", "Transfer volume11")
|
||||
|
||||
# 2. user13 can list/details the transfer it started
|
||||
self.set_connections_for_user(self.user13)
|
||||
all_transfers = self._list_volume_transfers()
|
||||
self.assertIn("Transfer volume11", [t.name for t in all_transfers])
|
||||
transfer = self._get_volume_transfer("Transfer volume11")
|
||||
self.assertEqual("Transfer volume11", transfer.name)
|
||||
|
||||
# 3. user13 cannot start the transfer in project1
|
||||
self.assertRaisesRegex(
|
||||
Exception,
|
||||
"Policy doesn't allow volume:create_transfer to be performed",
|
||||
self._start_volume_transfer,
|
||||
"volume11",
|
||||
"Another transfer volume11"
|
||||
)
|
||||
|
||||
# 4. user21 start a volume transfer in project2
|
||||
self.set_connections_for_user(self.user21)
|
||||
self._create_volume("volume21")
|
||||
transfer21 = self._start_volume_transfer("volume21", "Transfer volume21")
|
||||
|
||||
# 5. user13 cannot accept the transfer in project1
|
||||
self.set_connections_for_user(self.user13)
|
||||
self.assertRaisesRegex(
|
||||
Exception,
|
||||
"Policy doesn't allow volume:accept_transfer to be performed",
|
||||
self._accept_volume_transfer,
|
||||
transfer21.id,
|
||||
transfer21.auth_key
|
||||
)
|
||||
|
||||
def test_uc_volumebackup_1(self):
|
||||
"""
|
||||
1. user11/12 can create a volume backup of project1,
|
||||
2. user13 cannot create a volume backup of project1,
|
||||
3. user11/user12/user13 can list/details the created backup
|
||||
4. user 11 can restore the backup
|
||||
5. user12 CAN restore the backup (or else nova migration will fail)
|
||||
6. user12/user13 cannot restore the backup
|
||||
7. user11 can delete the backup
|
||||
8. use12/user13 cannot delete the backup
|
||||
"""
|
||||
self.set_connections_for_user(self.user11)
|
||||
self._create_volume("volume11")
|
||||
|
||||
# 1. user11/12 can create a volume backup of project1
|
||||
self._create_volume_backup("volume11", "volume11-bkp11")
|
||||
self.set_connections_for_user(self.user12)
|
||||
self._create_volume_backup("volume11", "volume11-bkp12")
|
||||
|
||||
# 2. user13 cannot create a volume backup of project1
|
||||
self.set_connections_for_user(self.user13)
|
||||
self.assertRaisesRegex(
|
||||
Exception,
|
||||
"Policy doesn't allow backup:create to be performed",
|
||||
self._create_volume_backup,
|
||||
"volume11",
|
||||
"volume11-bkp13"
|
||||
)
|
||||
|
||||
# 3. user11/user12/user13 can list/details the created backup
|
||||
for user in (self.user11, self.user12, self.user13):
|
||||
self.set_connections_for_user(user)
|
||||
self._get_volume_backup("volume11-bkp11")
|
||||
|
||||
# 4. user 11 can restore the backup
|
||||
self.set_connections_for_user(self.user11)
|
||||
self._restore_volume_backup("volume11-bkp12", "restored-volume11")
|
||||
self.assertIn("restored-volume11", [v.name for v in self._list_volumes()])
|
||||
|
||||
# 5. user12 CAN restore the backup (or else nova migration will fail)
|
||||
self.set_connections_for_user(self.user12)
|
||||
self.assertRaisesRegex(
|
||||
Exception,
|
||||
"Policy doesn't allow backup:restore to be performed",
|
||||
self._restore_volume_backup, "volume11-bkp12", "restored-volume12"
|
||||
)
|
||||
self.assertNotIn("restored-volume12", [v.name for v in self._list_volumes()])
|
||||
|
||||
# 6. user13 cannot restore the backup
|
||||
self.set_connections_for_user(self.user13)
|
||||
self.assertRaisesRegex(
|
||||
Exception,
|
||||
"Policy doesn't allow backup:restore to be performed",
|
||||
self._restore_volume_backup, "volume11-bkp12", "restored-volume13"
|
||||
)
|
||||
self.assertNotIn("restored-volume13", [v.name for v in self._list_volumes()])
|
||||
|
||||
# 7. user11 can delete the backup
|
||||
self.set_connections_for_user(self.user11)
|
||||
self._delete_volume_backup("volume11-bkp12")
|
||||
|
||||
# 8. use12/user13 cannot delete the backup
|
||||
for user in (self.user12, self.user13):
|
||||
self.set_connections_for_user(user)
|
||||
self.assertRaisesRegex(
|
||||
Exception,
|
||||
"Policy doesn't allow backup:delete to be performed",
|
||||
self._delete_volume_backup,
|
||||
"volume11-bkp11"
|
||||
)
|
50
enhanced-policies/tox.ini
Normal file
50
enhanced-policies/tox.ini
Normal file
@ -0,0 +1,50 @@
|
||||
[tox]
|
||||
minversion = 3.9.0
|
||||
envlist = policy-functional
|
||||
skipsdist = True
|
||||
ignore_basepython_conflict = True
|
||||
|
||||
[testenv]
|
||||
usedevelop = True
|
||||
skip_install = True
|
||||
basepython = python3
|
||||
setenv =
|
||||
VIRTUAL_ENV={envdir}
|
||||
LANG=en_US.UTF-8
|
||||
LANGUAGE=en_US:en
|
||||
LC_ALL=C
|
||||
OS_LOG_CAPTURE={env:OS_LOG_CAPTURE:false}
|
||||
OS_STDOUT_CAPTURE={env:OS_STDOUT_CAPTURE:true}
|
||||
OS_STDERR_CAPTURE={env:OS_STDERR_CAPTURE:true}
|
||||
deps =
|
||||
-r{toxinidir}/test-requirements.txt
|
||||
|
||||
|
||||
[testenv:policy-functional]
|
||||
commands = pytest --collect-only tests/
|
||||
|
||||
[testenv:pep8]
|
||||
deps =
|
||||
hacking>=3.1.0,<4.0.0 # Apache-2.0
|
||||
flake8-import-order>=0.17.1 # LGPLv3
|
||||
pycodestyle>=2.0.0,<2.7.0 # MIT
|
||||
Pygments>=2.2.0 # BSD
|
||||
doc8>=0.8.0 # Apache 2.0
|
||||
commands =
|
||||
flake8 {posargs}
|
||||
doc8 doc/source README.rst
|
||||
|
||||
[flake8]
|
||||
application-import-names = tests
|
||||
# The following are ignored on purpose. It's not super worth it to fix them.
|
||||
# However, if you feel strongly about it, patches will be accepted to fix them
|
||||
# if they fix ALL of the occurances of one and only one of them.
|
||||
# H238 New Style Classes are the default in Python3
|
||||
# H4 Are about docstrings and there's just a huge pile of pre-existing issues.
|
||||
# W503 Is supposed to be off by default but in the latest pycodestyle isn't.
|
||||
# Also, both openstacksdk and Donald Knuth disagree with the rule. Line
|
||||
# breaks should occur before the binary operator for readability.
|
||||
ignore = H238,H4,W503
|
||||
import-order-style = pep8
|
||||
show-source = True
|
||||
exclude=.venv,.git,.tox,dist,doc,*lib/python*,*egg,build,openstacksdk-0.55.0
|
Loading…
Reference in New Issue
Block a user