openstack-armada-app/enhanced-policies/tests/test_neutron/rbac_neutron.py

190 lines
8.8 KiB
Python

#
# Copyright (c) 2021 Wind River Systems, Inc.
#
# SPDX-License-Identifier: Apache-2.0
#
# All Rights Reserved.
#
from tests.fv_rbac import OpenStackBasicTesting
from tests.fv_rbac import debug1
class OpenStackNetworkingTesting(OpenStackBasicTesting):
def _find_ip_availability(self, network_name_or_id, ignore_missing=True, **args):
return self.os_sdk_conn.network.find_network_ip_availability(network_name_or_id, ignore_missing=ignore_missing, **args)
def _get_ip_availability(self, network_name_or_id):
network = self._find_ip_availability(network_name_or_id, ignore_missing=False)
return self.os_sdk_conn.network.get_network_ip_availability(network.id)
def _list_ip_availabilities(self, network_name):
return self.os_sdk_conn.network.network_ip_availabilities(network_name=network_name)
def _create_subnetpool(self, name, prefixes, shared=False, autoclear=True):
subnetpool = self.os_sdk_conn.network.create_subnet_pool(
name=name, prefixes=prefixes, shared=shared)
if debug1: print("created subnetpool: " + subnetpool.name + " id: " + subnetpool.id)
if autoclear:
self.subnet_pools_clearing.append(subnetpool.id)
return subnetpool
def _delete_subnetpool(self, name_or_id, autoclear=True):
subnetpool = self._find_subnetpool(name_or_id, ignore_missing=False)
self.os_sdk_conn.network.delete_subnet_pool(subnetpool.id)
if debug1: print("deleted subnetpool: " + subnetpool.name + " id: " + subnetpool.id)
if autoclear:
self.subnet_pools_clearing.remove(subnetpool.id)
def _update_subnetpool(self, name_or_id, **args):
subnetpool = self._find_subnetpool(name_or_id, ignore_missing=False)
return self.os_sdk_conn.network.update_subnet_pool(subnetpool.id, **args)
def _list_subnetpools(self):
return self.os_sdk_conn.network.subnet_pools()
def _find_subnetpool(self, name_or_id, ignore_missing=True, **args):
return self.os_sdk_conn.network.find_subnet_pool(name_or_id, ignore_missing=ignore_missing, **args)
def _get_subnetpool(self, name_or_id):
subnetpool = self._find_subnetpool(name_or_id, ignore_missing=False)
return self.os_sdk_conn.network.get_subnet_pool(subnetpool.id)
def _create_addrscope(self, name, ip_version=4, shared=False, autoclear=True):
addrscope = self.os_sdk_conn.network.create_address_scope(name=name, ip_version=ip_version, shared=shared)
if debug1: print("created addrscope: " + addrscope.name + " id: " + addrscope.id)
if autoclear:
self.address_scopes_clearing.append(addrscope.id)
return addrscope
def _delete_addrscope(self, name_or_id, autoclear=True):
addrscope = self._find_addrscope(name_or_id, ignore_missing=False)
self.os_sdk_conn.network.delete_address_scope(addrscope.id)
if debug1: print("deleted addrscope: " + addrscope.name + " id: " + addrscope.id)
if autoclear:
self.address_scopes_clearing.remove(addrscope.id)
def _update_addrscope(self, name_or_id, new_name):
addrscope = self._find_addrscope(name_or_id, ignore_missing=False)
return self.os_sdk_conn.network.update_address_scope(addrscope.id, name=new_name)
def _list_addrscopes(self):
return self.os_sdk_conn.network.address_scopes()
def _find_addrscope(self, name_or_id, ignore_missing=True, **args):
return self.os_sdk_conn.network.find_address_scope(name_or_id, ignore_missing=ignore_missing, **args)
def _get_addrscope(self, name_or_id):
addrscope = self._find_addrscope(name_or_id, ignore_missing=False)
return self.os_sdk_conn.network.get_address_scope(addrscope.id)
def _create_portforwarding(self, fip_id, protocol, internal_ip_address, internal_port, internal_port_id, external_port):
return self.os_sdk_conn.network.create_port_forwarding(
floatingip_id=fip_id,
protocol=protocol,
internal_ip_address=internal_ip_address,
internal_port=internal_port,
internal_port_id=internal_port_id,
external_port=external_port
)
def _delete_portforwarding(self, pf_id, fip_id):
return self.os_sdk_conn.network.delete_port_forwarding(pf_id, fip_id)
def _update_portforwarding(self, pf_id, fip_id, **args):
return self.os_sdk_conn.network.update_port_forwarding(pf_id, fip_id, **args)
def _list_portforwarding(self, fip_id):
return self.os_sdk_conn.network.port_forwardings(fip_id)
def _get_portforwarding(self, pf_id, fip_id):
return self.os_sdk_conn.network.get_port_forwarding(pf_id, fip_id)
def _create_trunk(self, name, port_name_or_id, sub_ports, autoclear=True):
port = self._find_port(port_name_or_id, ignore_missing=False)
trunk = self.os_sdk_conn.network.create_trunk(name=name, port_id=port.id, sub_ports=sub_ports)
if debug1: print("created trunk: " + trunk.name + " id: " + trunk.id)
if autoclear:
self.trunks_clearing.append(trunk.id)
return trunk
def _delete_trunk(self, name_or_id, autoclear=True):
trunk = self._find_trunk(name_or_id, ignore_missing=False)
self.os_sdk_conn.network.delete_trunk(trunk.id)
if debug1: print("deleted trunk: " + trunk.name + " id: " + trunk.id)
if autoclear:
self.trunks_clearing.remove(trunk.id)
def _update_trunk(self, name_or_id, **args):
trunk = self._find_trunk(name_or_id, ignore_missing=False)
return self.os_sdk_conn.network.update_trunk(trunk, **args)
def _list_trunks(self):
return self.os_sdk_conn.network.trunks()
def _find_trunk(self, name_or_id, ignore_missing=True, **args):
return self.os_sdk_conn.network.find_trunk(name_or_id, ignore_missing=ignore_missing, **args)
def _get_trunk(self, name_or_id):
trunk = self._find_trunk(name_or_id, ignore_missing=False)
return self.os_sdk_conn.network.get_trunk(trunk.id)
def _get_trunk_subports(self, name_or_id):
trunk = self._find_trunk(name_or_id, ignore_missing=False)
subports = self.os_sdk_conn.network.get_trunk_subports(trunk)
return subports.get('sub_ports')
def _add_trunk_subport(self, trunk_name_or_id, port_name_or_id, seg_id, seg_type):
trunk = self._find_trunk(trunk_name_or_id, ignore_missing=False)
port = self._find_port(port_name_or_id, ignore_missing=False)
port_list = [{
'port_id': port.id,
'segmentation_id': seg_id,
'segmentation_type': seg_type
}]
return self.os_sdk_conn.network.add_trunk_subports(trunk.id, port_list)
def _remove_trunk_subport(self, trunk_name_or_id, port_name_or_id):
trunk = self._find_trunk(trunk_name_or_id, ignore_missing=False)
port = self._find_port(port_name_or_id, ignore_missing=False)
port_list = [{'port_id': port.id}]
return self.os_sdk_conn.network.delete_trunk_subports(trunk.id, port_list)
def _create_rbac_policy(self, action, network_id, target_tenant):
return self.os_sdk_conn.network.create_rbac_policy(
action=action,
object_id=network_id,
object_type="network",
target_tenant=target_tenant)
def _delete_rbac_policy(self, policy_id):
return self.os_sdk_conn.network.delete_rbac_policy(policy_id)
def _update_rbac_policy(self, policy_id, **args):
return self.os_sdk_conn.network.update_rbac_policy(policy_id, **args)
def _list_rbac_policies(self):
return self.os_sdk_conn.network.rbac_policies()
def _find_rbac_policy(self, policy_id, ignore_missing=True, **args):
return self.os_sdk_conn.network.find_rbac_policy(policy_id, ignore_missing=ignore_missing, **args)
def _get_rbac_policy(self, policy_id):
return self.os_sdk_conn.network.get_rbac_policy(policy_id)
def _create_security_group_rule(self, name_or_id, direction, protocol, ethertype, **attrs):
sg = self._find_security_group(name_or_id, ignore_missing=False)
return self.os_sdk_conn.network.create_security_group_rule(security_group_id=sg.id, direction=direction, protocol=protocol, ethertype=ethertype, **attrs)
def _delete_security_group_rule(self, rule_id):
return self.os_sdk_conn.network.delete_security_group_rule(rule_id)
def _list_security_group_rules(self, sg_id):
return self.os_sdk_conn.network.security_group_rules(security_group_id=sg_id)
def _find_security_group_rule(self, name_or_id, ignore_missing=True, **args):
return self.os_sdk_conn.network.find_security_group_rule(name_or_id, ignore_missing=ignore_missing, **args)
def _get_security_group_rule(self, sg_id):
return self.os_sdk_conn.network.get_security_group_rule(sg_id)