From 319c274e09ade2804e055fd50c582fe83dafef0a Mon Sep 17 00:00:00 2001 From: Heitor Matsui Date: Mon, 17 May 2021 18:08:45 -0300 Subject: [PATCH] RBAC Patch 4: Neutron tests This patch chain aims to suggest a set of default policies for user management on stx-openstack. We suggest the creation of the project_admin and project_readonly roles and provide some policies to fine tune the access control over the Openstack services to those roles, as described on README.md. Also, we provide a set of tests to ensure the policies and permissions are all working as expected on site for the cloud administrators. This commit includes Neutron related tests and functions. Story: 2008910 Task: 42501 Signed-off-by: Heitor Matsui Signed-off-by: Thiago Brito Co-authored-by: Miriam Yumi Peixoto Co-authored-by: Leonardo Zaccarias Co-authored-by: Rogerio Oliveira Ferraz Change-Id: I4d8d487ba8623b7817d88920742a4c465d85a135 --- enhanced-policies/tests/fv_rbac.py | 773 ++++++++- .../tests/test_neutron/__init__.py | 0 .../tests/test_neutron/rbac_neutron.py | 189 ++ .../tests/test_neutron/test_rbac_neutron.py | 1517 +++++++++++++++++ 4 files changed, 2478 insertions(+), 1 deletion(-) create mode 100644 enhanced-policies/tests/test_neutron/__init__.py create mode 100644 enhanced-policies/tests/test_neutron/rbac_neutron.py create mode 100644 enhanced-policies/tests/test_neutron/test_rbac_neutron.py diff --git a/enhanced-policies/tests/fv_rbac.py b/enhanced-policies/tests/fv_rbac.py index 140beb75..ec75a589 100644 --- a/enhanced-policies/tests/fv_rbac.py +++ b/enhanced-policies/tests/fv_rbac.py @@ -877,4 +877,775 @@ class OpenStackBasicTesting(): # Returns # None """ - self.os_sdk_conn.image.reactivate_image(image.id) \ No newline at end of file + self.os_sdk_conn.image.reactivate_image(image.id) + + # ------------------------------------------------------------------------- + # Network methods - Neutron + # ------------------------------------------------------------------------- + + def _create_floatingip(self, subnet_id, floating_network_id, port_id=None, + autoclear=True, **attrs): + """ + # Create a new floating ip from attributes + # Parameters + # autoclear – Used in the teardown mechanism (keep default value) + # attrs (dict) – Keyword arguments which will be used to create a + # FloatingIP, comprised of the properties on the FloatingIP class. + # Returns + # The results of floating ip creation + # Return type + # FloatingIP + """ + fip = self.os_sdk_conn.network.create_ip( + subnet_id=subnet_id, + floating_network_id=floating_network_id, + port_id=port_id, **attrs + ) + if debug1: print("created fip: " + fip.name + " id: " + fip.id) + if autoclear: + self.floating_ips_clearing.append(fip.id) + return fip + + def _delete_floatingip(self, fip_name_or_id, if_revision=None, + autoclear=True): + """ + # Delete a floating ip + # Parameters + # fip_name_or_id – The name or ID of an IP or a FloatingIP instance. + # ignore_missing (bool) – When set to False ResourceNotFound will be + # raised when the floating ip does not exist. When set to True, no + # exception will be set when attempting to delete a nonexistent ip. + # if_revision (int) – Revision to put in If-Match header of update + # request to perform compare-and-swap update. + # autoclear – Used in the teardown mechanism (keep default value) + # Returns + # None + """ + fip = self._find_floatingip(fip_name_or_id, ignore_missing=False) + self.os_sdk_conn.network.delete_ip(fip.id, if_revision=if_revision) + if debug1: + print("deleted fip: " + fip.name + " id: " + fip.id) + if autoclear: + self.floating_ips_clearing.remove(fip.id) + + def _update_floatingip(self, fip_name_or_id, if_revision=None, **args): + fip = self._find_floatingip(fip_name_or_id, ignore_missing=False) + """ + # Update a ip + # Parameters + # fip_name_or_id – The name or ID of an IP or a FloatingIP instance. + # if_revision (int) – Revision to put in If-Match header of update request + # to perform compare-and-swap update. + # attrs (dict) – The attributes to update on the ip represented by value. + # Returns + # The updated ip + # Return type + # FloatingIP + """ + return self.os_sdk_conn.network.update_ip(fip.id, + if_revision=if_revision, + **args) + + def _list_floatingips(self, **query): + """ + # Return a generator of ips + # Parameters + # query (dict) – + # Optional query parameters to be sent to limit + # the resources being returned. Valid parameters are: + # description: The description of a floating IP. + # fixed_ip_address: The fixed IP address associated with a + # floating IP address. + # floating_ip_address: The IP address of a floating IP. + # floating_network_id: The ID of the network associated with + # a floating IP. + # port_id: The ID of the port to which a floating IP is + # associated. + # project_id: The ID of the project a floating IP is + # associated with. + # router_id: The ID of an associated router. + # status: The status of a floating IP, which can be ACTIVE + # or DOWN. + # Returns + # A generator of floating IP objects + # Return type + # FloatingIP + """ + return self.os_sdk_conn.network.ips(**query) + + def _find_floatingip(self, fip_name_or_id, ignore_missing=True, **args): + """ + # Find a single FloatingIP + # Parameters + # fip_name_or_id – The name or ID of a FloatingIP instance. + # ignore_missing (bool) – When set to False ResourceNotFound will be raised + # when the resource does not exist. When set to True, None will be returned + # when attempting to find a nonexistent resource. + # args (dict) – Any additional parameters to be passed into underlying + # methods. such as query filters. + # Returns + # One FloatingIP or None + """ + return self.os_sdk_conn.network.find_ip(fip_name_or_id, + ignore_missing=ignore_missing, + **args) + + def _get_floatingip(self, fip_name_or_id): + """ + # Get a single floating ip + # Parameters + # fip_name_or_id – The name or ID of a FloatingIP instance. + # Returns + # One FloatingIP + # Raises + # ResourceNotFound when no resource can be found. + """ + fip = self._find_floatingip(fip_name_or_id, ignore_missing=False) + return self.os_sdk_conn.network.get_ip(fip.id) + + def _create_router(self, name, ext_network_name, autoclear=True, **attrs): + """ + # Create a new router from attributes + # Parameters + # autoclear – Used in the teardown mechanism (keep default value) + # attrs (dict) – Keyword arguments which will be used to create a Router, + # comprised of the properties on the Router class. + # Returns + # The results of router creation + # Return type + # Router + """ + network = self._get_network(ext_network_name) + router = self.os_sdk_conn.network.create_router( + name=name, + external_gateway_info={'network_id': network.id}, + **attrs + ) + if debug1: print( + "created router: " + router.name + " id: " + router.id) + if autoclear: + self.routers_clearing.append(router.id) + return router + + def _delete_router(self, router_name_or_id, ignore_missing=True, + if_revision=None, autoclear=True): + """ + # Delete a router + # Parameters + # router_name_or_id – The name or ID of a Router instance. + # ignore_missing (bool) – When set to False ResourceNotFound will be raised + # when the router does not exist. When set to True, no exception will be + # set when attempting to delete a nonexistent router. + # if_revision (int) – Revision to put in If-Match header of update request + # to perform compare-and-swap update. + # autoclear – Used in the teardown mechanism (keep default value) + # Returns + # None + """ + router = self._find_router(router_name_or_id, ignore_missing=False) + self.os_sdk_conn.network.delete_router(router.id, + ignore_missing=ignore_missing, + if_revision=if_revision) + if debug1: print( + "deleted router: " + router.name + " id: " + router.id) + if autoclear: + self.routers_clearing.remove(router.id) + + def _update_router(self, router_name_or_id, if_revision=None, **args): + """ + # Update a router + # Parameters + # router_name_or_id – The name or ID of a Router instance. + # if_revision (int) – Revision to put in If-Match header of update request + # to perform compare-and-swap update. + # attrs (dict) – The attributes to update on the router represented by + # router. + # Returns + # The updated router + # Return type + # Router + """ + router = self._find_router(router_name_or_id, ignore_missing=False) + return self.os_sdk_conn.network.update_router(router.id, + if_revision=if_revision, + **args) + + def _list_routers(self, **query): + """ + # Return a generator of routers + # Parameters + # query (dict) – + # Optional query parameters to be sent to limit + # the resources being returned. Valid parameters are: + # description: The description of a router. + # flavor_id: The ID of the flavor. + # is_admin_state_up: Router administrative state is up or not + # is_distributed: The distributed state of a router + # is_ha: The highly-available state of a router + # name: Router name + # project_id: The ID of the project this router is associated + # with. + # status: The status of the router. + # Returns + # A generator of router objects + # Return type + # Router + """ + return self.os_sdk_conn.network.routers(**query) + + def _find_router(self, router_name_or_id, ignore_missing=True, **args): + """ + # Find a single router + # Parameters + # router_name_or_id – The name or ID of a router. + # ignore_missing (bool) – When set to False ResourceNotFound will be raised + # when the resource does not exist. When set to True, None will be returned + # when attempting to find a nonexistent resource. + # args (dict) – Any additional parameters to be passed into underlying + # methods. such as query filters. + # Returns + # One Router or None + """ + return self.os_sdk_conn.network.find_router(router_name_or_id, + ignore_missing=ignore_missing, + **args) + + def _get_router(self, router_name_or_id): + """ + # Get a single router + # Parameters + # router_name_or_id – The name or ID of a Router instance. + # Returns + # One Router or None + """ + router = self._find_router(router_name_or_id, ignore_missing=False) + return self.os_sdk_conn.network.get_router(router.id) + + def _add_interface_to_router(self, ri, autoclear=True): + """ + # Add Interface to a router + # Parameters + # ri – ID of an OpenStackRouterInterface instance + # autoclear – Used in the teardown mechanism (keep default value) + # Returns + # Router with updated interface + # Return type + # class + # ~openstack.network.v2.router.Router + """ + router = self._find_router(ri.router_name_or_id, ignore_missing=False) + subnet = self._find_subnet(ri.subnet_name_or_id, ignore_missing=False) + interface = self.os_sdk_conn.network.add_interface_to_router( + router.id, + subnet_id=subnet.id + ) + if debug1: + print("added interface to router " + router.name + " : " + + subnet.name + " id: " + subnet.id) + if autoclear: + self.interfaces_clearing.append(ri) + return interface + + def _delete_interface_from_router(self, ri, autoclear=True): + """ + # Remove Interface from a router + # Parameters + # ri – ID of an OpenStackRouterInterface instance + # autoclear – Used in the teardown mechanism (keep default value) + # Returns + # Router with updated interface + # Return type + # class + # ~openstack.network.v2.router.Router + """ + router = self._find_router(ri.router_name_or_id, ignore_missing=False) + subnet = self._find_subnet(ri.subnet_name_or_id, ignore_missing=False) + self.os_sdk_conn.network.remove_interface_from_router( + router.id, + subnet_id=subnet.id + ) + if debug1: + print("removed interface from router " + router.name + " : " + + subnet.name + " id: " + subnet.id) + if autoclear: + self.interfaces_clearing.remove(ri) + + def _create_network(self, name, shared=False, autoclear=True, **args): + """ + # Create a new network from attributes + # Parameters + # autoclear – Used in the teardown mechanism (keep default value) + # attrs (dict) – Keyword arguments which will be used to create a Network, + # comprised of the properties on the Network class. + # Returns + # The results of network creation + # Return type + # Network + """ + conn = self.os_sdk_conn + network = conn.network.create_network(name=name, shared=shared, **args) + if debug1: print( + "created network: " + network.name + " id: " + network.id) + if autoclear: + self.networks_clearing.append(network.id) + return network + + def _delete_network(self, network_name_or_id, if_revision=None, + autoclear=True): + """ + # Delete a network + # Parameters + # network_name_or_id – The name or ID of a Network instance. + # ignore_missing (bool) – When set to False ResourceNotFound will be raised + # when the network does not exist. When set to True, no exception will be + # set when attempting to delete a nonexistent network. + # if_revision (int) – Revision to put in If-Match header of update request + # to perform compare-and-swap update. + # autoclear – Used in the teardown mechanism (keep default value) + # Returns + # None + """ + network = self._find_network(network_name_or_id, ignore_missing=False) + self.os_sdk_conn.network.delete_network(network.id, + if_revision=if_revision) + if debug1: print( + "deleted network: " + network.name + " id: " + network.id) + if autoclear: + self.networks_clearing.remove(network.id) + + def _update_network(self, network_name_or_id, if_revision=None, **args): + """ + # Update a network + # Parameters + # network_name_or_id – The name or ID of a Network instance. + # if_revision (int) – Revision to put in If-Match header of update request + # to perform compare-and-swap update. + # attrs (dict) – The attributes to update on the network represented by + # network. + # Returns + # The updated network + # Return type + # Network + """ + network = self._find_network(network_name_or_id, ignore_missing=False) + return self.os_sdk_conn.network.update_network(network.id, + if_revision=if_revision, + **args) + + def _list_networks(self, **query): + """ + # Return a generator of networks + # Parameters + # query (kwargs) – + # Optional query parameters to be sent to limit the resources being + # returned. Available parameters include: + # description: The network description. + # ipv4_address_scope_id: The ID of the IPv4 address scope for + # the network. + # ipv6_address_scope_id: The ID of the IPv6 address scope for + # the network. + # is_admin_state_up: Network administrative state + # is_port_security_enabled: The port security status. + # is_router_external: Network is external or not. + # is_shared: Whether the network is shared across projects. + # name: The name of the network. + # status: Network status + # project_id: Owner tenant ID + # provider_network_type: Network physical mechanism + # provider_physical_network: Physical network + # provider_segmentation_id: VLAN ID for VLAN networks or Tunnel + # ID for GENEVE/GRE/VXLAN networks + # Returns + # A generator of network objects + # Return type + # Network + """ + return self.os_sdk_conn.list_networks(**query) + + def _find_network(self, network_name_or_id, ignore_missing=True, **args): + """ + # Find a single network + # Parameters + # network_name_or_id – The name or ID of a Network instance. + # ignore_missing (bool) – When set to False ResourceNotFound will be raised + # when the resource does not exist. When set to True, None will be returned when attempting to find a nonexistent resource. + # args (dict) – Any additional parameters to be passed into underlying + # methods. such as query filters. + # Returns + # One Network or None + """ + return self.os_sdk_conn.network.find_network(network_name_or_id, + ignore_missing=ignore_missing, + **args) + + def _get_network(self, network_name_or_id): + """ + # Get a single network + # Parameters + # network_name_or_id – The name or ID of a Network instance. + # Returns + # One Network or None + """ + network = self._find_network(network_name_or_id, ignore_missing=False) + return self.os_sdk_conn.network.get_network(network.id) + + def _create_subnet(self, name, network_name_or_id, enable_dhcp=True, + ip_version=4, cidr=None, gateway_ip=None, + autoclear=True, **attrs): + """ + # Create a new subnet from attributes + # Parameters + # autoclear – Used in the teardown mechanism (keep default value) + # attrs (dict) – Keyword arguments which will be used to create a Subnet, + # comprised of the properties on the Subnet class. + # Returns + # The results of subnet creation + # Return type + # Subnet + """ + network = self._find_network(network_name_or_id, ignore_missing=False) + subnet = self.os_sdk_conn.network.create_subnet( + name=name, + network_id=network.id, + enable_dhcp=enable_dhcp, + cidr=cidr, + gateway_ip=gateway_ip, + ip_version=ip_version, + **attrs + ) + if debug1: print( + "created subnet: " + subnet.name + " id: " + subnet.id) + if autoclear: + self.subnets_clearing.append(subnet.id) + return subnet + + def _delete_subnet(self, subnet_name_or_id, if_revision=None, + autoclear=True): + """ + # Delete a subnet + # Parameters + # subnet_name_or_id – The name or ID of a Subnet instance. + # ignore_missing (bool) – When set to False ResourceNotFound will be raised + # when the subnet does not exist. When set to True, no exception will be + # set when attempting to delete a nonexistent subnet. + # if_revision (int) – Revision to put in If-Match header of update request + # to perform compare-and-swap update. + # autoclear – Used in the teardown mechanism (keep default value) + # Returns + # None + """ + subnet = self._find_subnet(subnet_name_or_id, ignore_missing=False) + self.os_sdk_conn.network.delete_subnet(subnet.id, + if_revision=if_revision) + if debug1: print( + "deleted subnet: " + subnet.name + " id: " + subnet.id) + if autoclear: + self.subnets_clearing.remove(subnet.id) + + def _update_subnet(self, subnet_name_or_id, if_revision=None, **args): + """ + # Update a subnet + # Parameters + # subnet_name_or_id – The name or ID of a Subnet instance. + # if_revision (int) – Revision to put in If-Match header of update request + # to perform compare-and-swap update. + # attrs (dict) – The attributes to update on the subnet represented by + # subnet. + # Returns + # The updated subnet + # Return type + # Subnet + """ + subnet = self._find_subnet(subnet_name_or_id, ignore_missing=False) + return self.os_sdk_conn.network.update_subnet(subnet.id, + if_revision=if_revision, + **args) + + def _list_subnets(self, **query): + """ + # Return a generator of subnets + # Parameters + # query (dict) – + # Optional query parameters to be sent to limit the resources being + # returned. Available parameters include: + # cidr: Subnet CIDR + # description: The subnet description + # gateway_ip: Subnet gateway IP address + # ip_version: Subnet IP address version + # ipv6_address_mode: The IPv6 address mode + # ipv6_ra_mode: The IPv6 router advertisement mode + # is_dhcp_enabled: Subnet has DHCP enabled (boolean) + # name: Subnet name + # network_id: ID of network that owns the subnets + # project_id: Owner tenant ID + # subnet_pool_id: The subnet pool ID from which to obtain a + # CIDR. + # Returns + # A generator of subnet objects + # Return type + # Subnet + """ + return self.os_sdk_conn.list_subnets(**query) + + def _find_subnet(self, subnet_name_or_id, ignore_missing=True, **args): + """ + # Find a single subnet + # Parameters + # subnet_name_or_id – The name or ID of a Subnet instance. + # ignore_missing (bool) – When set to False ResourceNotFound will be raised + # when the resource does not exist. When set to True, None will be returned + # when attempting to find a nonexistent resource. + # args (dict) – Any additional parameters to be passed into underlying + # methods. such as query filters. + # Returns + # One Subnet or None + """ + return self.os_sdk_conn.network.find_subnet( + subnet_name_or_id, + ignore_missing=ignore_missing, + **args + ) + + def _get_subnet(self, subnet_name_or_id): + """ + # Get a single subnet + # Parameters + # subnet_name_or_id – The name or ID of a Subnet instance. + # Returns + # One Subnet or None + """ + subnet = self._find_subnet(subnet_name_or_id, ignore_missing=False) + return self.os_sdk_conn.network.get_subnet(subnet.id) + + def _create_port(self, port_name, network_name_or_id, autoclear=True, + **attrs): + """ + # Create a new port from attributes + # Parameters + # autoclear – Used in the teardown mechanism (keep default value) + # attrs (dict) – Keyword arguments which will be used to create a Port, + # comprised of the properties on the Port class. + # Returns + # The results of port creation + # Return type + # Port + """ + network = self._find_network(network_name_or_id, ignore_missing=False) + port = self.os_sdk_conn.network.create_port(name=port_name, + network_id=network.id, + **attrs) + if debug1: print("created port id: " + port.id) + if autoclear: + self.ports_clearing.append(port.id) + return port + + def _delete_port(self, port_name_or_id, if_revision=None, autoclear=True): + """ + # Delete a port + # Parameters + # port_name_or_id – The name or ID of a Port instance. + # ignore_missing (bool) – When set to False ResourceNotFound will be raised + # when the port does not exist. When set to True, no exception will be set + # when attempting to delete a nonexistent port. + # if_revision (int) – Revision to put in If-Match header of update request + # to perform compare-and-swap update. + # autoclear – Used in the teardown mechanism (keep default value) + # Returns + # None + """ + port = self._find_port(port_name_or_id, ignore_missing=False) + self.os_sdk_conn.network.delete_port(port.id, if_revision=if_revision) + if debug1: print("deleted port id: " + port.id) + if autoclear: + self.ports_clearing.remove(port.id) + + def _update_port(self, port_name_or_id, if_revision=None, **args): + """ + # Update a port + # Parameters + # port_name_or_id – The name or ID of a Port instance. + # if_revision (int) – Revision to put in If-Match header of update request + # to perform compare-and-swap update. + # attrs (dict) – The attributes to update on the port represented by port. + # Returns + # The updated port + # Return type + # Port + """ + port = self._find_port(port_name_or_id, ignore_missing=False) + return self.os_sdk_conn.network.update_port(port.id, + if_revision=if_revision, + **args) + + def _list_ports(self, **kwargs): + """ + # Return a generator of ports + # Parameters + # query (kwargs) – + # Optional query parameters to be sent to limit the resources being + # returned. Available parameters include: + # description: The port description. + # device_id: Port device ID. + # device_owner: Port device owner (e.g. network:dhcp). + # ip_address: IP addresses of an allowed address pair. + # is_admin_state_up: The administrative state of the port. + # is_port_security_enabled: The port security status. + # mac_address: Port MAC address. + # name: The port name. + # network_id: ID of network that owns the ports. + # project_id: The ID of the project who owns the network. + # status: The port status. Value is ACTIVE or DOWN. + # subnet_id: The ID of the subnet. + # Returns + # A generator of port objects + # Return type + # Port + """ + return self.os_sdk_conn.network.ports(**kwargs) + + def _find_port(self, port_name_or_id, ignore_missing=True, **args): + """ + # Find a single port + # Parameters + # port_name_or_id – The name or ID of a Port instance. + # ignore_missing (bool) – When set to False ResourceNotFound will be raised + # when the resource does not exist. When set to True, None will be returned + # when attempting to find a nonexistent resource. + # args (dict) – Any additional parameters to be passed into underlying + # methods. such as query filters. + # Returns + # One Port or None + """ + return self.os_sdk_conn.network.find_port(port_name_or_id, + ignore_missing=True, **args) + + def _get_port(self, port_name_or_id): + """ + # Get a single port + # Parameters + # port_name_or_id – The name or ID of a Port instance. + # Returns + # One Port + # Raises + # ResourceNotFound when no resource can be found. + """ + port = self._find_port(port_name_or_id, ignore_missing=False) + return self.os_sdk_conn.network.get_port(port.id) + + def _create_security_group(self, name, autoclear=True, **attrs): + """ + # Create a new security group from attributes + # Parameters + # autoclear – Used in the teardown mechanism (keep default value) + # attrs (dict) – Keyword arguments which will be used to create a + # SecurityGroup, comprised of the properties on the SecurityGroup + # class. + # Returns + # The results of security group creation + # Return type + # SecurityGroup + """ + sg = self.os_sdk_conn.network.create_security_group(name=name, **attrs) + if debug1: print("created SG: " + sg.name + " id: " + sg.id) + if autoclear: + self.security_groups_clearing.append(sg.id) + return sg + + def _delete_security_group(self, sg_name_or_id, ignore_missing=True, + if_revision=None, autoclear=True): + """ + # Delete a security group + # Parameters + # sg_name_or_id – The name or ID of a SecurityGroup instance. + # ignore_missing (bool) – When set to False ResourceNotFound will be + # raised when the security group does not exist. When set to True, no + # exception will be set when attempting to delete a nonexistent + # security group. + # if_revision (int) – Revision to put in If-Match header of update + # request to perform compare-and-swap update. + # autoclear – Used in the teardown mechanism (keep default value) + # Returns + # None + """ + sg = self._find_security_group(sg_name_or_id, ignore_missing=False) + self.os_sdk_conn.network.delete_security_group( + sg.id, + ignore_missing=ignore_missing, + if_revision=if_revision + ) + if debug1: print("deleted SG: " + sg.name + " id: " + sg.id) + if autoclear: + self.security_groups_clearing.remove(sg.id) + + def _update_security_group(self, sg_name_or_id, description=None, + if_revision=None, **attrs): + """ + # Update a security group + # Parameters + # sg_name_or_id – The name or ID of a SecurityGroup instance. + # attrs (dict) – The attributes to update on the security group + # represented by security_group. + # Returns + # The updated security group + # Return type + # SecurityGroup + """ + sg = self._find_security_group(sg_name_or_id, ignore_missing=False) + return self.os_sdk_conn.network.update_security_group( + sg.id, + description=description, + if_revision=if_revision, + **attrs + ) + + def _list_security_groups(self, **query): + """ + # Return a generator of security groups + # Parameters + # query (dict) – + # Optional query parameters to be sent to limit the resources being + # returned. + # Valid parameters are: + # description: Security group description + # ìd: The id of a security group, or list of security group ids + # name: The name of a security group + # project_id: The ID of the project this security group is + # associated with. + # Returns + # A generator of security group objects + # Return type + # SecurityGroup + """ + return self.os_sdk_conn.network.security_groups(**query) + + def _find_security_group(self, sg_name_or_id, ignore_missing=True, **args): + """ + # Find a single security group + # Parameters + # sg_name_or_id – The name or ID of a SecurityGroup instance. + # ignore_missing (bool) – When set to False ResourceNotFound will be + # raised when the resource does not exist. When set to True, None + # will be returned when attempting to find a nonexistent resource. + # args (dict) – Any additional parameters to be passed into + # underlying methods. such as query filters. + # Returns + # One SecurityGroup or None + """ + return self.os_sdk_conn.network.find_security_group( + sg_name_or_id, + ignore_missing=ignore_missing, + **args + ) + + def _get_security_group(self, sg_name_or_id): + """ + # Get a single security group + # Parameters + # sg_name_or_id – The name or ID of a SecurityGroup instance. + # Returns + # One SecurityGroup + # Raises + # ResourceNotFound when no resource can be found. + """ + sg = self._find_security_group(sg_name_or_id, ignore_missing=False) + return self.os_sdk_conn.network.get_security_group(sg.id) \ No newline at end of file diff --git a/enhanced-policies/tests/test_neutron/__init__.py b/enhanced-policies/tests/test_neutron/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/enhanced-policies/tests/test_neutron/rbac_neutron.py b/enhanced-policies/tests/test_neutron/rbac_neutron.py new file mode 100644 index 00000000..cad6699b --- /dev/null +++ b/enhanced-policies/tests/test_neutron/rbac_neutron.py @@ -0,0 +1,189 @@ +# +# Copyright (c) 2021 Wind River Systems, Inc. +# +# SPDX-License-Identifier: Apache-2.0 +# +# All Rights Reserved. +# + +from tests.fv_rbac import OpenStackBasicTesting +from tests.fv_rbac import debug1 + +class OpenStackNetworkingTesting(OpenStackBasicTesting): + + def _find_ip_availability(self, network_name_or_id, ignore_missing=True, **args): + return self.os_sdk_conn.network.find_network_ip_availability(network_name_or_id, ignore_missing=ignore_missing, **args) + + def _get_ip_availability(self, network_name_or_id): + network = self._find_ip_availability(network_name_or_id, ignore_missing=False) + return self.os_sdk_conn.network.get_network_ip_availability(network.id) + + def _list_ip_availabilities(self, network_name): + return self.os_sdk_conn.network.network_ip_availabilities(network_name=network_name) + + def _create_subnetpool(self, name, prefixes, shared=False, autoclear=True): + subnetpool = self.os_sdk_conn.network.create_subnet_pool( + name=name, prefixes=prefixes, shared=shared) + if debug1: print("created subnetpool: " + subnetpool.name + " id: " + subnetpool.id) + if autoclear: + self.subnet_pools_clearing.append(subnetpool.id) + return subnetpool + + def _delete_subnetpool(self, name_or_id, autoclear=True): + subnetpool = self._find_subnetpool(name_or_id, ignore_missing=False) + self.os_sdk_conn.network.delete_subnet_pool(subnetpool.id) + if debug1: print("deleted subnetpool: " + subnetpool.name + " id: " + subnetpool.id) + if autoclear: + self.subnet_pools_clearing.remove(subnetpool.id) + + def _update_subnetpool(self, name_or_id, **args): + subnetpool = self._find_subnetpool(name_or_id, ignore_missing=False) + return self.os_sdk_conn.network.update_subnet_pool(subnetpool.id, **args) + + def _list_subnetpools(self): + return self.os_sdk_conn.network.subnet_pools() + + def _find_subnetpool(self, name_or_id, ignore_missing=True, **args): + return self.os_sdk_conn.network.find_subnet_pool(name_or_id, ignore_missing=ignore_missing, **args) + + def _get_subnetpool(self, name_or_id): + subnetpool = self._find_subnetpool(name_or_id, ignore_missing=False) + return self.os_sdk_conn.network.get_subnet_pool(subnetpool.id) + + def _create_addrscope(self, name, ip_version=4, shared=False, autoclear=True): + addrscope = self.os_sdk_conn.network.create_address_scope(name=name, ip_version=ip_version, shared=shared) + if debug1: print("created addrscope: " + addrscope.name + " id: " + addrscope.id) + if autoclear: + self.address_scopes_clearing.append(addrscope.id) + return addrscope + + def _delete_addrscope(self, name_or_id, autoclear=True): + addrscope = self._find_addrscope(name_or_id, ignore_missing=False) + self.os_sdk_conn.network.delete_address_scope(addrscope.id) + if debug1: print("deleted addrscope: " + addrscope.name + " id: " + addrscope.id) + if autoclear: + self.address_scopes_clearing.remove(addrscope.id) + + def _update_addrscope(self, name_or_id, new_name): + addrscope = self._find_addrscope(name_or_id, ignore_missing=False) + return self.os_sdk_conn.network.update_address_scope(addrscope.id, name=new_name) + + def _list_addrscopes(self): + return self.os_sdk_conn.network.address_scopes() + + def _find_addrscope(self, name_or_id, ignore_missing=True, **args): + return self.os_sdk_conn.network.find_address_scope(name_or_id, ignore_missing=ignore_missing, **args) + + def _get_addrscope(self, name_or_id): + addrscope = self._find_addrscope(name_or_id, ignore_missing=False) + return self.os_sdk_conn.network.get_address_scope(addrscope.id) + + def _create_portforwarding(self, fip_id, protocol, internal_ip_address, internal_port, internal_port_id, external_port): + return self.os_sdk_conn.network.create_port_forwarding( + floatingip_id=fip_id, + protocol=protocol, + internal_ip_address=internal_ip_address, + internal_port=internal_port, + internal_port_id=internal_port_id, + external_port=external_port + ) + + def _delete_portforwarding(self, pf_id, fip_id): + return self.os_sdk_conn.network.delete_port_forwarding(pf_id, fip_id) + + def _update_portforwarding(self, pf_id, fip_id, **args): + return self.os_sdk_conn.network.update_port_forwarding(pf_id, fip_id, **args) + + def _list_portforwarding(self, fip_id): + return self.os_sdk_conn.network.port_forwardings(fip_id) + + def _get_portforwarding(self, pf_id, fip_id): + return self.os_sdk_conn.network.get_port_forwarding(pf_id, fip_id) + + def _create_trunk(self, name, port_name_or_id, sub_ports, autoclear=True): + port = self._find_port(port_name_or_id, ignore_missing=False) + trunk = self.os_sdk_conn.network.create_trunk(name=name, port_id=port.id, sub_ports=sub_ports) + if debug1: print("created trunk: " + trunk.name + " id: " + trunk.id) + if autoclear: + self.trunks_clearing.append(trunk.id) + return trunk + + def _delete_trunk(self, name_or_id, autoclear=True): + trunk = self._find_trunk(name_or_id, ignore_missing=False) + self.os_sdk_conn.network.delete_trunk(trunk.id) + if debug1: print("deleted trunk: " + trunk.name + " id: " + trunk.id) + if autoclear: + self.trunks_clearing.remove(trunk.id) + + def _update_trunk(self, name_or_id, **args): + trunk = self._find_trunk(name_or_id, ignore_missing=False) + return self.os_sdk_conn.network.update_trunk(trunk, **args) + + def _list_trunks(self): + return self.os_sdk_conn.network.trunks() + + def _find_trunk(self, name_or_id, ignore_missing=True, **args): + return self.os_sdk_conn.network.find_trunk(name_or_id, ignore_missing=ignore_missing, **args) + + def _get_trunk(self, name_or_id): + trunk = self._find_trunk(name_or_id, ignore_missing=False) + return self.os_sdk_conn.network.get_trunk(trunk.id) + + def _get_trunk_subports(self, name_or_id): + trunk = self._find_trunk(name_or_id, ignore_missing=False) + subports = self.os_sdk_conn.network.get_trunk_subports(trunk) + return subports.get('sub_ports') + + def _add_trunk_subport(self, trunk_name_or_id, port_name_or_id, seg_id, seg_type): + trunk = self._find_trunk(trunk_name_or_id, ignore_missing=False) + port = self._find_port(port_name_or_id, ignore_missing=False) + port_list = [{ + 'port_id': port.id, + 'segmentation_id': seg_id, + 'segmentation_type': seg_type + }] + return self.os_sdk_conn.network.add_trunk_subports(trunk.id, port_list) + + def _remove_trunk_subport(self, trunk_name_or_id, port_name_or_id): + trunk = self._find_trunk(trunk_name_or_id, ignore_missing=False) + port = self._find_port(port_name_or_id, ignore_missing=False) + port_list = [{'port_id': port.id}] + return self.os_sdk_conn.network.delete_trunk_subports(trunk.id, port_list) + + def _create_rbac_policy(self, action, network_id, target_tenant): + return self.os_sdk_conn.network.create_rbac_policy( + action=action, + object_id=network_id, + object_type="network", + target_tenant=target_tenant) + + def _delete_rbac_policy(self, policy_id): + return self.os_sdk_conn.network.delete_rbac_policy(policy_id) + + def _update_rbac_policy(self, policy_id, **args): + return self.os_sdk_conn.network.update_rbac_policy(policy_id, **args) + + def _list_rbac_policies(self): + return self.os_sdk_conn.network.rbac_policies() + + def _find_rbac_policy(self, policy_id, ignore_missing=True, **args): + return self.os_sdk_conn.network.find_rbac_policy(policy_id, ignore_missing=ignore_missing, **args) + + def _get_rbac_policy(self, policy_id): + return self.os_sdk_conn.network.get_rbac_policy(policy_id) + + def _create_security_group_rule(self, name_or_id, direction, protocol, ethertype, **attrs): + sg = self._find_security_group(name_or_id, ignore_missing=False) + return self.os_sdk_conn.network.create_security_group_rule(security_group_id=sg.id, direction=direction, protocol=protocol, ethertype=ethertype, **attrs) + + def _delete_security_group_rule(self, rule_id): + return self.os_sdk_conn.network.delete_security_group_rule(rule_id) + + def _list_security_group_rules(self, sg_id): + return self.os_sdk_conn.network.security_group_rules(security_group_id=sg_id) + + def _find_security_group_rule(self, name_or_id, ignore_missing=True, **args): + return self.os_sdk_conn.network.find_security_group_rule(name_or_id, ignore_missing=ignore_missing, **args) + + def _get_security_group_rule(self, sg_id): + return self.os_sdk_conn.network.get_security_group_rule(sg_id) diff --git a/enhanced-policies/tests/test_neutron/test_rbac_neutron.py b/enhanced-policies/tests/test_neutron/test_rbac_neutron.py new file mode 100644 index 00000000..e6a89778 --- /dev/null +++ b/enhanced-policies/tests/test_neutron/test_rbac_neutron.py @@ -0,0 +1,1517 @@ +# +# Copyright (c) 2021 Wind River Systems, Inc. +# +# SPDX-License-Identifier: Apache-2.0 +# +# All Rights Reserved. +# + +import pytest +import netaddr + +from openstack import exceptions +from tests.fv_rbac import OpenStackRouterInterface +from tests.test_neutron.rbac_neutron import OpenStackNetworkingTesting + + +@pytest.fixture(scope='class', autouse=True) +def networking_setup(request, network_admin_setup): + + cfg = network_admin_setup + + request.cls.os_sdk_admin_conn = cfg.os_sdk_admin_conn + request.cls.users = cfg.users + + request.cls.user02 = cfg.user02 + request.cls.user11 = cfg.user11 + request.cls.user12 = cfg.user12 + request.cls.user13 = cfg.user13 + + request.cls.env = cfg.env + + +class TestNetworking(OpenStackNetworkingTesting): + def test_uc_network_1(self, tc_teardown): + """ + 1. user11 tries to create dedicated network 'network11', and tries to + create shared network 'network12', should succeed; + 2. user11 tries to create external network 'extnet11', should fail; + 3. user11/12/13 tries to get list and detail of networks created in + step 1. should succeed; + 4. user11 tries to update networks created in step1,should succeed; + 5. user12/13 tries to update/delete networks created in step1,should + fail; + 6. user11 tries to delete networks created in step1,should succeed; + 7. user12/13 tries to create networks mentioned in step1,should fail; + """ + + print ("\nTC-1") + + self.set_connections_for_user(self.user11) + + network11 = self._create_network("network11") + assert network11 is not None + network12 = self._create_network("network12", shared=True) + assert network12 is not None + + args = {'router:external': True} + with pytest.raises(exceptions.HttpException) as err: + assert self._create_network("extnet11", **args) + assert err.match("HttpException: 403") + + for user in [self.user11, self.user12, self.user13]: + self.set_connections_for_user(user) + + networks = self._list_networks() + + for name in ["network11", "network12"]: + assert name in [n.name for n in networks] + network = self._get_network(name) + assert network is not None + + self.set_connections_for_user(self.user11) + for name in ["network11", "network12"]: + network = self._get_network(name) + port_security_enabled = not network.is_port_security_enabled + args = {'port_security_enabled': port_security_enabled} + self._update_network(name, **args) + network = self._get_network(name) + assert network.is_port_security_enabled == port_security_enabled + + for user in [self.user12, self.user13]: + self.set_connections_for_user(user) + for name in ["network11", "network12"]: + network = self._get_network(name) + port_security_enabled = not network.is_port_security_enabled + + with pytest.raises(exceptions.HttpException) as err: + args = {'port_security_enabled': port_security_enabled} + assert self._update_network(name, **args) + assert err.match("HttpException: 403") + + network = self._get_network(name) + assert network.is_port_security_enabled != port_security_enabled + + with pytest.raises(exceptions.HttpException) as err: + assert self._delete_network(name) + assert err.match("HttpException: 403") + + self.set_connections_for_user(self.user11) + for name in ["network11", "network12"]: + self._delete_network(name) + assert name not in [n.name for n in self._list_networks()] + + for user in [self.user12, self.user13]: + self.set_connections_for_user(user) + + with pytest.raises(exceptions.HttpException) as err: + assert self._create_network("network11") + assert err.match("HttpException: 403") + + with pytest.raises(exceptions.HttpException) as err: + assert self._create_network("network12", shared=True) + assert err.match("HttpException: 403") + + def test_uc_network_3(self, tc_teardown): + """ + 1. user02 create dedicated network 'network21' and shared network + 'network22', external network 'extnet21' + 2. user11/12/13 tries to get list and detail of networks, should + succeed to get 'network22' and extnet21, should fail to get + 'network21'; + 3. user11/12/13 tries to update/delete 'network22', 'extnet21', should + fail; + """ + + print ("\nTC-3") + + self.set_connections_for_user(self.user02) + + network21 = self._create_network("network21") + assert network21 is not None + network22 = self._create_network("network22", shared=True) + assert network22 is not None + args = {'router:external': True} + extnet21 = self._create_network("extnet21", shared=True, **args) + assert extnet21 is not None + + for user in [self.user11, self.user12, self.user13]: + self.set_connections_for_user(user) + for name in ["network21", "network22", "extnet21"]: + if name == "network21": + assert (name not in [n.name for n in self._list_networks()]) + assert (self._find_network(name) is None) + else: + assert name in [n.name for n in self._list_networks()] + network = self._get_network(name) + assert network is not None + + for user in [self.user11, self.user12, self.user13]: + self.set_connections_for_user(user) + for name in ["network22", "extnet21"]: + network = self._get_network(name) + port_security_enabled = not network.is_port_security_enabled + with pytest.raises(exceptions.HttpException) as err: + args = {'port_security_enabled': port_security_enabled} + assert self._update_network(name, **args) + assert err.match("HttpException: 403") + + network = self._get_network(name) + assert network.is_port_security_enabled != port_security_enabled + + with pytest.raises(exceptions.HttpException) as err: + assert self._delete_network(name) + assert err.match("HttpException: 403") + + def test_uc_subnet_1(self, tc_teardown): + """ + 1. user11 tries to create dedicated network 'network11' and subnet + 'subnet11' for it, tries to create shared network and subnet: + 'network12' and 'subnet12', should succeed; + 2. user11/12/13 tries to get list and detail of subnets: 'subnet11' + and 'subnet12', should succeed; + 3. user11 tries to update subnets 'subnet11' and 'subnet12', should + succeed; + 4. user12/13 tries to update/delete subnets 'subnet11' and 'subnet12', + should fail; + 5. user11 tries to delete subnets 'subnet11' and 'subnet12',should + succeed; + 6. user12/13 tries to create subnet 'subnet13' for network 'network21', + should fail; + """ + + print ("\nTC-4") + + self.set_connections_for_user(self.user11) + + network11 = self._create_network("network11") + assert network11 is not None + + subnet = self._create_subnet("subnet11", "network11", + cidr="192.168.195.0/24", + gateway_ip="192.168.195.1") + assert "subnet11" in [s.name for s in self._list_subnets()] + + network12 = self._create_network("network12", shared=True) + assert network12 is not None + + subnet = self._create_subnet("subnet12", "network12", + cidr="192.168.196.0/24", + gateway_ip="192.168.196.1") + assert "subnet12" in [s.name for s in self._list_subnets()] + + for user in [self.user11, self.user12, self.user13]: + self.set_connections_for_user(user) + for name in ["subnet11", "subnet12"]: + assert name in [s.name for s in self._list_subnets()] + subnet = self._get_subnet(name) + assert subnet is not None + + self.set_connections_for_user(self.user11) + for name in ["subnet11", "subnet12"]: + subnet = self._get_subnet(name) + new_dhcp_enabled = not subnet.is_dhcp_enabled + args = {'enable_dhcp': new_dhcp_enabled} + self._update_subnet(name, **args) + subnet = self._get_subnet(name) + assert subnet.is_dhcp_enabled == new_dhcp_enabled + + for user in [self.user12, self.user13]: + self.set_connections_for_user(user) + for name in ["subnet11", "subnet12"]: + subnet = self._get_subnet(name) + new_dhcp_enabled = not subnet.is_dhcp_enabled + + with pytest.raises(exceptions.HttpException) as err: + assert self._update_subnet(name, **args) + assert err.match("HttpException: 403") + + subnet = self._get_subnet(name) + assert subnet.is_dhcp_enabled != new_dhcp_enabled + + with pytest.raises(exceptions.HttpException) as err: + assert self._delete_subnet(name) + assert err.match("HttpException: 403") + + self.set_connections_for_user(self.user11) + for name in ["subnet11", "subnet12"]: + self._delete_subnet(name) + assert self._find_subnet(name) is None + + for user in [self.user12, self.user13]: + self.set_connections_for_user(user) + + with pytest.raises(exceptions.HttpException) as err: + self._create_subnet("subnet13", "network12", + cidr="192.168.197.0/24", gateway_ip="192.168.197.1") + assert err.match("HttpException: 403") + + subnets = self._list_subnets() + assert "subnet13" not in [s.name for s in subnets] + + def test_uc_subnet_2(self, tc_teardown): + """ + 1. user02 creates network21 and subnet21 for project2, and creates + shared network22 and subnet22 for project2, + 2. user11/12/13 tries to get list and detail of subnets, should succeed + to get 'subnet22,', should fail to get 'subnet21'; + 3. user11/12/13 tries to update/delete subnets: subnet21 and subnet22, + should fail + """ + + print ("\nTC-5") + + self.set_connections_for_user(self.user02) + + network = self._create_network("network21") + assert network is not None + + subnet21 = self._create_subnet("subnet21", "network21", + enable_dhcp=False, + cidr="192.168.195.0/24", + gateway_ip="192.168.195.1") + subnet21 = self._get_subnet("subnet21") + assert subnet21 is not None + + network = self._create_network("network22", shared=True) + assert network is not None + + subnet22 = self._create_subnet("subnet22", "network22", + enable_dhcp=False, + cidr="192.168.196.0/24", + gateway_ip="192.168.196.1") + subnet22 = self._get_subnet("subnet22") + assert subnet22 is not None + + for user in [self.user11, self.user12, self.user13]: + self.set_connections_for_user(user) + + subnets = self._list_subnets() + assert ("subnet21" not in [s.name for s in subnets]) + assert self._find_subnet("subnet21") is None + + subnets = self._list_subnets() + assert ("subnet22" in [s.name for s in subnets]) + subnet = self._get_subnet("subnet22") + assert subnet is not None + + for user in [self.user11, self.user12, self.user13]: + self.set_connections_for_user(user) + + args = {'enable_dhcp': True} + with pytest.raises(exceptions.ResourceNotFound): + assert self._update_subnet(subnet21.id, **args) + + with pytest.raises(exceptions.ResourceNotFound): + self._delete_subnet(subnet21.id) + + with pytest.raises(exceptions.HttpException) as err: + assert self._update_subnet(subnet22.id, **args) + assert err.match("HttpException: 403") + + with pytest.raises(exceptions.HttpException) as err: + self._delete_subnet(subnet22.id) + assert err.match("HttpException: 403") + + def test_uc_ip_availabilities(self, tc_teardown): + """ + 1. user02 creates network21 and subnet21 for project2, and creates + shared 'network22' and 'subnet22' for project2, + 2. user11 create network and subnet: 'network11', 'subnet11' for + project1. + 3. user12/13 tries to get list and detail of network-ip-availabilities + of networks, should fail to get list and detail of + network-ip-availabilities of network11. network 21,network22 + """ + + print ("\nTC-6") + + self.set_connections_for_user(self.user02) + + network21 = self._create_network("network21") + assert network21 is not None + network22 = self._create_network("network22", shared=True) + assert network22 is not None + + self._create_subnet("subnet21", "network21", + cidr="192.168.195.0/24", + gateway_ip="192.168.195.1") + assert "subnet21" in [s.name for s in self._list_subnets()] + + self._create_subnet("subnet22", "network22", + cidr="192.168.196.0/24", + gateway_ip="192.168.196.1") + assert "subnet22" in [s.name for s in self._list_subnets()] + + self.set_connections_for_user(self.user11) + network = self._create_network("network11") + assert network is not None + + self._create_subnet("subnet11", "network11", + cidr="192.168.195.0/24", + gateway_ip="192.168.195.1") + assert "subnet11" in [s.name for s in self._list_subnets()] + + for user in [self.user12, self.user13]: + self.set_connections_for_user(user) + for name in ["network11", "network21", "network22"]: + ip_availabilities = self._list_ip_availabilities(name) + assert name not in [i.network_name for i in ip_availabilities] + assert self._find_ip_availability(name) is None + + def test_uc_subnetpool_1(self, tc_teardown): + """ + 1. user11 tries to create dedicated subnetpool 'subnetpool11', and tries + to create shared subnetpool 'subnetpool12',should succeed; + 2. user11/12/13 tries to get list and detail of subnetpools created in + step 1, should succeed; + 3. user11 tries to update subnetpools created in step1.should succeed; + 4. user12/13 tries to update/delete subnetpools created in step1.should + fail; + 5. user11 tries to delete subnetpools created in step1.should succeed; + 6. user12/13 tries to create subnetpools mentioned in step1.should fail; + """ + + print ("\nTC-7") + + self.set_connections_for_user(self.user11) + + subnetpool11 = self._create_subnetpool( + "subnetpool11", + prefixes=["192.168.0.0/16", "10.10.24.0/21"]) + assert subnetpool11 is not None + + subnetpool12 = self._create_subnetpool( + "subnetpool12", + prefixes=["192.169.0.0/16", "10.10.38.0/21"], + shared=True) + assert subnetpool12 is not None + + for user in [self.user11, self.user12, self.user13]: + self.set_connections_for_user(user) + + for name in ["subnetpool11", "subnetpool12"]: + subnetpools = self._list_subnetpools() + assert name in [s.name for s in subnetpools] + + subnetpool = self._get_subnetpool(name) + assert subnetpool is not None + + self.set_connections_for_user(self.user11) + for name in ["subnetpool11", "subnetpool12"]: + subnetpool = self._get_subnetpool(name) + new_max_prefixlen = subnetpool.maximum_prefix_length - 2 + + args = {'max_prefixlen': new_max_prefixlen} + self._update_subnetpool(name, **args) + + subnetpool = self._get_subnetpool(name) + assert subnetpool.maximum_prefix_length == new_max_prefixlen + + for user in [self.user12, self.user13]: + self.set_connections_for_user(user) + for name in ["subnetpool11", "subnetpool12"]: + subnetpool = self._get_subnetpool(name) + new_max_prefixlen = subnetpool.maximum_prefix_length + 2 + + args = {'max_prefixlen': new_max_prefixlen} + with pytest.raises(exceptions.HttpException) as err: + assert self._update_subnetpool(name, **args) + assert err.match("HttpException: 403") + + with pytest.raises(exceptions.HttpException) as err: + assert self._delete_subnetpool(name) + assert err.match("HttpException: 403") + + assert name in [s.name for s in self._list_subnetpools()] + + self.set_connections_for_user(self.user11) + for name in ["subnetpool11", "subnetpool12"]: + subnetpool = self._get_subnetpool(name) + self._delete_subnetpool(name) + assert name not in [s.name for s in self._list_subnetpools()] + + for user in [self.user12, self.user13]: + self.set_connections_for_user(user) + + with pytest.raises(exceptions.HttpException) as err: + assert self._create_subnetpool( + "subnetpool11", + prefixes=["192.168.0.0/16", "10.10.24.0/21"]) + + with pytest.raises(exceptions.HttpException) as err: + assert self._create_subnetpool( + "subnetpool12", + prefixes=["192.169.0.0/16", "10.10.38.0/21"], + shared=True) + + def test_uc_subnetpool_2(self, tc_teardown): + """ + 1. user02 create dedicated subnetpool 'subnetpool21' and shared + subnetpool 'subnetpool22' for project2, + 2. user11/12/13 tries to get list and detail of subnetpool, should + succeed to get 'subnetpool22' and fail to get 'subnetpool21', + 3. user11/12/13 tries to update/delete , 'subnetpool22', should fail; + """ + + print ("\nTC-8") + + self.set_connections_for_user(self.user02) + + subnetpool21 = self._create_subnetpool( + "subnetpool21", + prefixes=["192.168.0.0/16", "10.10.24.0/21"]) + assert subnetpool21 is not None + + subnetpool22 = self._create_subnetpool( + "subnetpool22", + prefixes=["192.169.0.0/16", "10.10.48.0/21"], + shared=True) + assert subnetpool22 is not None + + for user in [self.user11, self.user12, self.user13]: + self.set_connections_for_user(user) + + subnetpools = self._list_subnetpools() + assert "subnetpool21" not in [s.name for s in subnetpools] + assert self._find_subnetpool("subnetpool21") is None + + subnetpools = self._list_subnetpools() + assert "subnetpool22" in [s.name for s in subnetpools] + subnetpool = self._get_subnetpool("subnetpool22") + assert subnetpool is not None + new_max_prefixlen = subnetpool.maximum_prefix_length + 2 + + args = {'max_prefixlen': new_max_prefixlen} + with pytest.raises(exceptions.HttpException) as err: + assert self._update_subnetpool("subnetpool22", **args) + assert err.match("HttpException: 403") + + with pytest.raises(exceptions.HttpException) as err: + assert self._delete_subnetpool("subnetpool22") + assert err.match("HttpException: 403") + + def test_uc_addrscope_1(self, tc_teardown): + """ + 1. user11 tries to create dedicated address-scope 'addrscope11', and + tries to create shared subnetpool 'addrscope12', should succeed; + 2. user11/12/13 tries to get list and detail of address-scopes created + in step 1, should succeed; + 3. user11 tries to update address-scopes created in step1.should + succeed; + 4. user12/13 tries to update/delete address-scopes created in + step1.should fail; + 5. user11 tries to delete address-scopes created in step1.should + succeed; + 6. user12/13 tries to create address-scopes mentioned in step1.should + fail; + """ + + print ("\nTC-9") + + self.set_connections_for_user(self.user11) + + addrscope11 = self._create_addrscope("addrscope11") + assert addrscope11 is not None + + addrscope12 = self._create_addrscope("addrscope12", shared=True) + assert addrscope12 is not None + + for user in [self.user11, self.user12, self.user13]: + self.set_connections_for_user(user) + + for name in ["addrscope11", "addrscope12"]: + addrscopes = self._list_addrscopes() + assert name in [a.name for a in addrscopes] + + addrscope = self._get_addrscope(name) + assert addrscope is not None + + self.set_connections_for_user(self.user11) + for name in ["addrscope11", "addrscope12"]: + addrscope = self._get_addrscope(name) + new_name = name + "_new" + self._update_addrscope(name, new_name) + addrscope = self._get_addrscope(new_name) + assert addrscope is not None + + for user in [self.user12, self.user13]: + self.set_connections_for_user(user) + + for name in ["addrscope11_new", "addrscope12_new"]: + addrscope = self._get_addrscope(name) + new_name = name + "_new" + with pytest.raises(exceptions.HttpException) as err: + assert self._update_addrscope(name, new_name) + assert err.match("HttpException: 403") + + with pytest.raises(exceptions.HttpException) as err: + assert self._delete_addrscope(name) + assert err.match("HttpException: 403") + + self.set_connections_for_user(self.user11) + for name in ["addrscope11_new", "addrscope12_new"]: + addrscope = self._get_addrscope(name) + self._delete_addrscope(name) + assert name not in [a.name for a in self._list_addrscopes()] + + for user in [self.user12, self.user13]: + self.set_connections_for_user(user) + with pytest.raises(exceptions.HttpException) as err: + assert self._create_addrscope("addrscope11", shared=True) + assert err.match("HttpException: 403") + + with pytest.raises(exceptions.HttpException) as err: + assert self._create_addrscope("addrscope12", shared=True) + assert err.match("HttpException: 403") + + def test_uc_addrscope_2(self, tc_teardown): + """ + 1. user02 create dedicated address-scope 'addrscope21' and shared + address-scope 'addrscope22', + 2. user11/12/13 tries to get list and detail of address-scopes, should + succeed to get 'addrscope22' and fail to get 'addrscope21', + 3. user11/12/13 tries to update/delete 'addrscope22', should fail; + """ + + print ("\nTC-10") + + self.set_connections_for_user(self.user02) + + addrscope21 = self._create_addrscope("addrscope21") + assert addrscope21 is not None + + addrscope22 = self._create_addrscope("addrscope22", shared=True) + assert addrscope22 is not None + + for user in [self.user11, self.user12, self.user13]: + self.set_connections_for_user(user) + + assert "addrscope21" not in [a.name for a in self._list_addrscopes()] + assert self._find_addrscope("addrscope21") is None + + assert "addrscope22" in [s.name for s in self._list_addrscopes()] + addresscope = self._get_addrscope("addrscope22") + assert addresscope is not None + + with pytest.raises(exceptions.HttpException) as err: + assert self._update_addrscope("addrscope22", "addrscope22_new") + assert err.match("HttpException: 403") + + with pytest.raises(exceptions.HttpException) as err: + assert self._delete_addrscope("addrscope22") + assert err.match("HttpException: 403") + + def test_uc_router_1(self, create_external_network, tc_teardown): + """ + 1. user02 create external network and subnet: extnet21, subnet21, + 2. user11 creates network 'network11' and 'subnet11', then tries to + create router 'vr11', then tries to add router interface for vr11 + and 'network11',should succeed; + 3. user11/12/13 tries to get list and detail of routers created in + step 1, should succeed; + 4. user11 tries to update routers created in step1.should succeed; + 5. user12/13 tries to update/delete routers created in step1.should + fail; + 6. user11 tries to remove router interface and tries to delete routers + created in step1.should succeed; + 7. user12/13 tries to create routers mentioned in step1.should fail; + """ + + print ("\nTC-11") + + extnet, extsubnet = create_external_network + + self.set_connections_for_user(self.user11) + + network11 = self._create_network("network11") + assert network11 is not None + + subnet11 = self._create_subnet("subnet11", "network11", + cidr="10.10.20.0/24", gateway_ip="10.10.20.1") + assert subnet11 is not None + + vr11 = self._create_router("vr11", extnet.name) + + ri = OpenStackRouterInterface("vr11", "subnet11") + vr11 = self._add_interface_to_router(ri) + assert subnet11.id in vr11.get('subnet_ids') + + for user in [self.user11, self.user12, self.user13]: + self.set_connections_for_user(user) + + routers = self._list_routers() + assert "vr11" in [r.name for r in routers] + router = self._get_router("vr11") + assert router is not None + + self.set_connections_for_user(self.user11) + + vr = self._get_router("vr11") + descr = "Router VR11" + args = {'description': descr} + router = self._update_router("vr11", **args) + + vr = self._get_router("vr11") + assert vr.description == descr + + for user in [self.user12, self.user13]: + self.set_connections_for_user(user) + + descr = "Router VR11 " + user.get("name") + args = {'description': descr} + with pytest.raises(exceptions.HttpException) as err: + assert self._update_router("vr11", **args) + assert err.match("HttpException: 403") + + with pytest.raises(exceptions.HttpException) as err: + assert self._delete_router("vr11") + assert err.match("HttpException: 403") + + self.set_connections_for_user(self.user11) + vr11 = self._delete_interface_from_router(ri) + self._delete_router("vr11") + assert "vr11" not in [r.name for r in self._list_routers()] + + for user in [self.user12, self.user13]: + self.set_connections_for_user(user) + with pytest.raises(exceptions.HttpException) as err: + assert self._create_router("vr11", extnet.name) + assert err.match("HttpException: 403") + + def test_uc_router_2(self, create_router_vr21): + """ + 1. user02 creates external network 'extnet2', and subnet2 for + 'extnet2', then create 'vr21' + 2. user11/12/13 tries to get list or detail of router vr21.should fail; + """ + + print ("\nTC-12") + + vr21 = create_router_vr21 + assert vr21 is not None + + for user in [self.user11, self.user12, self.user13]: + self.set_connections_for_user(user) + + assert "vr21" not in [r.name for r in self._list_routers()] + assert self._find_router("vr21") is None + + # No teardown is needed, the router created by create_router_vr21 fixture will be used from now on + + def test_uc_floatingip_1(self, create_router_vr21, create_external_network, create_router_vr11, tc_teardown): + """ + 1. user02 create shared external network and subnet: extnet21, subnet21 + 2. user11 creates network 'network11' and 'subnet11', then create + router 'vr11', add router interface for vr11 and 'network11', + 3. user11 launches vm11 over 'network11',which will create a port + connecting to 'network11', + 4. user11 tries to create floatingip for vm11's port, should succeed; + 5. user12/13 tries to update/delete floatingip created in step4. should + fail; + 6. user11 update/delete floatingip created in step4. should succeed. + 7. user12/13 tries to create floatingip as step4. should fail; + """ + + print ("\nTC-13") + + extnet, extsubnet = create_external_network + + self.set_connections_for_user(self.user11) + + network11 = self._create_network("network11") + assert network11 is not None + + subnet11 = self._create_subnet("subnet11", "network11", cidr="10.10.20.0/24", gateway_ip="10.10.20.1") + assert subnet11 is not None + + vr11 = create_router_vr11 + + ri = OpenStackRouterInterface("vr11", "subnet11") + vr11 = self._add_interface_to_router(ri) + assert subnet11.id in vr11.get("subnet_ids") + + vm11 = self._create_server("vm11", "cirros", "m1.tiny", network_name="network11") + vm11 = self._get_server("vm11") + + network = self._get_network("network11") + vm11_port = [p for p in self._list_ports(device_id=vm11.id)][0] + assert vm11_port.network_id == network.id + + fip11 = self._create_floatingip(subnet_id=extsubnet.id, floating_network_id=extnet.id, port_id=vm11_port.id) + assert fip11 is not None + + fip11 = self._get_floatingip(fip11.name) + for user in [self.user12, self.user13]: + self.set_connections_for_user(user) + + new_descr = "Floating IP " + user.get("name") + with pytest.raises(exceptions.HttpException) as err: + assert self._update_floatingip(fip11.name, description=new_descr) + assert err.match("HttpException: 403") + + with pytest.raises(exceptions.HttpException) as err: + assert self._delete_floatingip(fip11.name) + assert err.match("HttpException: 403") + + self.set_connections_for_user(self.user11) + + new_descr = "Floating IP " + user.get("name") + self._update_floatingip(fip11.name, description=new_descr) + fip11 = self._get_floatingip(fip11.name) + assert fip11.description == new_descr + + self._delete_floatingip(fip11.name) + assert fip11.name not in [f.name for f in self._list_floatingips()] + + for user in [self.user12, self.user13]: + self.set_connections_for_user(user) + + with pytest.raises(exceptions.HttpException) as err: + assert self._create_floatingip(subnet_id=extsubnet.id, + floating_network_id=extnet.id, + port_id=vm11_port.id) + assert err.match("HttpException: 403") + + def test_uc_floatingip_2(self, create_external_network, create_router_vr21, tc_teardown): + """ + 1. user02 create shared external network and subnet: extnet21, subnet21 + 2. user10 creates network 'network21' and 'subnet21', then create + router 'vr21', add router interface for vr21 and 'network21', + 3. user02 launches vm21 over 'network21',which will create a port + connecting to 'network21', + 4. user02 create floatingip for vm21's port; + 5. user11/12/13 tries to get list and detail of floatingip in created + in step4. should fail; + """ + + print ("\nTC-14") + + self.set_connections_for_user(self.user02) + + extnet, extsubnet = create_external_network + + network21 = self._create_network("network21") + assert network21 is not None + networks = self._list_networks() + assert "network21" in [n.name for n in networks] + + vr21 = create_router_vr21 + + network = self._get_network("network21") + subnet21 = self._create_subnet("subnet21", "network21", cidr="10.10.20.0/24", gateway_ip="10.10.20.1") + assert subnet21 is not None + + ri = OpenStackRouterInterface("vr21", "subnet21") + vr21 = self._add_interface_to_router(ri) + assert subnet21.id in vr21.get("subnet_ids") + + vm21 = self._create_server("vm21", "cirros", "m1.tiny", network_name="network21") + vm21 = self._get_server("vm21") + + network = self._get_network("network21") + vm21_port = [p for p in self._list_ports(device_id=vm21.id)][0] + assert vm21_port.network_id == network.id + + fip21 = self._create_floatingip(subnet_id=extsubnet.id, floating_network_id=extnet.id, port_id=vm21_port.id) + assert fip21 is not None + + for user in [self.user11, self.user12, self.user13]: + self.set_connections_for_user(user) + + assert fip21.id not in [fip.id for fip in self._list_floatingips()] + assert self._find_floatingip(fip21.name) is None + + def test_uc_portforwarding_1(self, create_external_network, create_router_vr11, tc_teardown): + """ + https://docs.starlingx.io/api-ref/stx-docs/api-ref-networking-v2-cgcs-ext.html#port-forwarding + + 1. user02 create external network and subnet: extnet21, subnet21, + 2. user11 creates network 'network11' and 'subnet11', then create router + 'vr11', add router interface for vr11 and 'network11', + 3. user11 launches vm11 over 'network11',which will create a port + connecting to 'network11', + 4. user11 tries to create floatingip for vm11's port, should succeed; + 5. user11 tries to create portforwarding for floatingip created on + step4, should succeed; + 6. user11 tries to update the portforwarding created in step5, should + succeed; + 7. user12/13 tries to update/delete portforwarding created in step5, + should fail; + 8. user11 delete portforwarding created in step5, should succeed. + 9. user12/13 tries to create portforwarding as step5, should fail; + """ + + print ("\nTC-15") + + extnet, extsubnet = create_external_network + + self.set_connections_for_user(self.user11) + + network11 = self._create_network("network11") + assert network11 is not None + + subnet = self._create_subnet("subnet11", "network11", cidr="10.10.20.0/24", gateway_ip="10.10.20.1") + assert subnet is not None + + vr11 = create_router_vr11 + + ri = OpenStackRouterInterface("vr11", "subnet11") + vr11 = self._add_interface_to_router(ri) + assert subnet.id in vr11.get("subnet_ids") + + vm11 = self._create_server("vm11", "cirros", "m1.tiny", network_name="network11") + vm11 = self._get_server("vm11") + + network = self._get_network("network11") + vm11_port = [p for p in self._list_ports(device_id=vm11.id)][0] + assert vm11_port.network_id == network.id + + fip11 = self._create_floatingip(subnet_id=extsubnet.id, floating_network_id=extnet.id) + assert fip11 is not None + + vm_ip_address = None + for ip in vm11_port.fixed_ips: + if ip.get('subnet_id') == subnet.id: + vm_ip_address = ip.get('ip_address') + + fip = self._get_floatingip(fip11.name) + fip_pf = self._create_portforwarding( + fip.id, + protocol="tcp", + internal_ip_address=vm_ip_address, + internal_port=25, + internal_port_id=vm11_port.id, + external_port=2230) + assert fip_pf is not None + + fip_pf = self._get_portforwarding(fip_pf.id, fip.id) + new_ext_port = fip_pf.external_port + 10 + fip_pf = self._update_portforwarding(fip_pf.id, fip.id, external_port=new_ext_port) + assert fip_pf is not None + fip_pf = self._get_portforwarding(fip_pf.id, fip.id) + assert fip_pf.external_port == new_ext_port + + for user in [self.user12, self.user13]: + self.set_connections_for_user(user) + + fip_pf = self._get_portforwarding(fip_pf.id, fip.id) + new_ext_port = fip_pf.external_port + 10 + with pytest.raises(exceptions.HttpException) as err: + assert self._update_portforwarding(fip_pf.id, fip.id, external_port=new_ext_port) + assert err.match("HttpException: 403") + + with pytest.raises(exceptions.HttpException) as err: + assert self._delete_portforwarding(fip_pf.id, fip.id) + assert err.match("HttpException: 403") + + self.set_connections_for_user(self.user11) + self._delete_portforwarding(fip_pf.id, fip.id) + port_forwardings = self._list_portforwarding(fip.id) + assert fip_pf.id not in [f.id for f in port_forwardings] + + # FIXME: not throwing exceptions and allowing the operation + for user in [self.user12, self.user13]: + self.set_connections_for_user(user) + + with pytest.raises(exceptions.HttpException) as err: + assert self._create_portforwarding( + fip.id, + protocol="tcp", + internal_ip_address=vm_ip_address, + internal_port=25, + internal_port_id=vm11_port.id, + external_port=2230) + assert err.match("HttpException: 403") + + def test_uc_portforwarding_2(self, create_external_network, create_router_vr21, tc_teardown): + """ + https://docs.starlingx.io/api-ref/stx-docs/api-ref-networking-v2-cgcs-ext.html#port-forwarding + + 1. user02 create external network and subnet: extnet21, subnet21, + 2. user10 creates network 'network21' and 'subnet21', then create router + 'vr21', add router interface for vr21 and 'network21', + 3. user02 launches vm21 over 'network21',which will create a port + connecting to 'network21', + 4. user02 creates floatingip for vm21's port, should succeed; + 5. user02 creates portforwarding for floatingip created on step4; + 6. user11/12/13 tries to get list and detail of portfowarding in created + in step5, should fail; + """ + + print ("\nTC-16") + + extnet, extsubnet = create_external_network + + self.set_connections_for_user(self.user02) + + network21 = self._create_network("network21") + assert network21 is not None + networks = self._list_networks() + assert "network21" in [n.name for n in networks] + + network = self._get_network("network21") + subnet22 = self._create_subnet("subnet21", "network21", cidr="10.10.20.0/24", gateway_ip="10.10.20.1") + assert subnet22 is not None + + vr21 = create_router_vr21 + subnet = self._get_subnet("subnet21") + ri = OpenStackRouterInterface("vr21", "subnet21") + vr21 = self._add_interface_to_router(ri) + assert subnet.id in vr21.get("subnet_ids") + + vm21 = self._create_server("vm21", "cirros", "m1.tiny", network_name="network21") + vm21 = self._get_server("vm21") + + network = self._get_network("network21") + vm21_port = [p for p in self._list_ports(device_id=vm21.id)][0] + assert vm21_port.network_id == network.id + + fip21 = self._create_floatingip(subnet_id=extsubnet.id, floating_network_id=extnet.id) + assert fip21 is not None + + vm_ip_address = None + for ip in vm21_port.fixed_ips: + if ip.get('subnet_id') == subnet.id: + vm_ip_address = ip.get('ip_address') + + fip = self._get_floatingip(fip21.name) + fip_pf21 = self._create_portforwarding(fip.id, + protocol="tcp", + internal_ip_address=vm_ip_address, + internal_port=25, + internal_port_id=vm21_port.id, + external_port=2250) + assert fip_pf21 is not None + + fip_pf = self._get_portforwarding(fip_pf21.id, fip.id) + new_ext_port = fip_pf.external_port + 10 + fip_pf21 = self._update_portforwarding(fip_pf.id, fip.id, external_port=new_ext_port) + fip_pf = self._get_portforwarding(fip_pf21.id, fip.id) + assert fip_pf.external_port == new_ext_port + + for user in [self.user11, self.user12, self.user13]: + self.set_connections_for_user(user) + + with pytest.raises(exceptions.ResourceNotFound): + port_forwardings = self._list_portforwarding(fip.id) + assert fip_pf.id not in [pf.id for pf in port_forwardings] + + with pytest.raises(exceptions.ResourceNotFound): + assert self._get_portforwarding(fip_pf.id, fip.id) + + self.set_connections_for_user(self.user02) + self._delete_portforwarding(fip_pf.id, fip.id) + port_forwardings = self._list_portforwarding(fip.id) + assert fip_pf.id not in [f.id for f in port_forwardings] + + @pytest.mark.parametrize('active_user', [ + ('user11'), + ('user12') + ]) + def test_uc_port_1(self, create_external_network, active_user, tc_teardown): + """ + 1. user02 create shared external network and subnet: extnet21, + extsubnetnet21; + 2. user11 create network and subnet: network11. subnet11; + 3. user11 create shared network and subnet: network12, subnet12; + 4. user11/12 tries to create port 'extport21' for extnet21. port11 for + network11, port12 for network12, should succeed; + 5. user11/12/13 tries to get list and detail of ports, should get + 'extport21', 'port11', 'port12'; + 6. user13 tries to update/delete ports: extport11. port21, port12, + should fail; + 7. user11/12 tries to update/delete ports: extport11, port21, port12, + should succeed; + """ + + print ("\nTC-17") + + self.set_connections_for_user(self.user02) + + extnet, extsubnet = create_external_network + + self.set_connections_for_user(self.user11) + + network11 = self._create_network("network11") + assert network11 is not None + + subnet11 = self._create_subnet("subnet11", "network11", + cidr="10.10.20.0/24", gateway_ip="10.10.20.1") + assert subnet11 is not None + + network12 = self._create_network("network12", shared=True) + assert network12 is not None + + subnet12 = self._create_subnet("subnet12", "network12", + cidr="10.10.20.0/24", gateway_ip="10.10.20.1") + assert subnet12 is not None + + if active_user == "user11": + self.set_connections_for_user(self.user11) + else: + self.set_connections_for_user(self.user12) + for network_name, port_name in [("extnet21", "extport21"), + ("network11", "port11"), + ("network12", "port12")]: + + port = self._create_port(port_name, network_name) + assert port is not None + + for user in [self.user11, self.user12, self.user13]: + self.set_connections_for_user(user) + + port_names = [p.name for p in self._list_ports()] + for port_name in ["extport21", "port11", "port12"]: + assert port_name in port_names + + port = self._get_port(port_name) + assert port is not None + + self.set_connections_for_user(self.user13) + for port_name in ["extport21", "port11", "port12"]: + new_descr = user.get("name") + " " + port_name + with pytest.raises(exceptions.HttpException) as err: + assert self._update_port(port_name, description=new_descr) + assert err.match("HttpException: 403") + + with pytest.raises(exceptions.HttpException) as err: + assert self._delete_port(port_name) + assert err.match("HttpException: 403") + + if active_user == "user11": + self.set_connections_for_user(self.user11) + else: + self.set_connections_for_user(self.user12) + + for port_name in ["extport21", "port11", "port12"]: + new_descr = user.get("name") + " " + port_name + self._update_port(port_name, description=new_descr) + port = self._get_port(port_name) + assert port.description == new_descr + + self._delete_port(port.id) + assert port_name not in [p.name for p in self._list_ports()] + + def test_uc_port_2(self, create_external_network, tc_teardown): + """ + 1. user02 create shared external network and subnet: extnet21, + extsubnetnet21; + 2. user02 create network and subnet: network21. subnet21; + 3. user02 create shared network and subnet: network22. subnet22; + 4. user02 create port 'extport21' for extnet21. port21 for network21, + port22 for network22; + 5. user11/12/13 tries to get list and detail of ports, should get + 'extport21', 'port21', 'port22'; should fail; + """ + + print ("\nTC-18") + + self.set_connections_for_user(self.user02) + + extnet, extsubnet = create_external_network + + network = self._create_network("network21") + assert network is not None + + self._create_subnet("subnet21", "network21", + enable_dhcp=False, + cidr="10.10.20.0/24", + gateway_ip="10.10.20.1") + assert "subnet21" in [s.name for s in self._list_subnets()] + + network = self._create_network("network22", shared=True) + assert network is not None + + self._create_subnet("subnet22", "network22", + enable_dhcp=False, + cidr="10.10.30.0/24", + gateway_ip="10.10.30.1") + assert "subnet22" in [s.name for s in self._list_subnets()] + + for network_name, port_name in [("extnet21", "extport21"), + ("network21", "port21"), + ("network22", "port22")]: + + port = self._create_port(port_name, network_name) + assert port is not None + + for user in [self.user11, self.user12, self.user13]: + self.set_connections_for_user(user) + + port_names = [p.name for p in self._list_ports()] + for port_name in ["extport21", "port21", "port22"]: + assert port_name not in port_names + assert self._find_port(port_name) is None + + def test_uc_trunk_1(self, tc_teardown): + """ + 1. user11 creates networks and subnets: 'network11'/'subnet11', + 'network12'/'subnet12', 'network13'/'subnet13'; + 2. user11 creates parent-port 'parentport11' over 'network11', creates + subport 'subport12' over 'network12', creates subport 'subport13' + over 'network13'; + 3. user11 tries to create trunk 'trunk11' with 'parentport11' and + 'subport12', should succeed; + 4. user11 tries to set 'subport13' to 'trunk11', should succeed; + 5. user11/12/13 tries to get list and detail of trunk 'trunk11', should + succeed; + 6. user11/12/13 tries to get list and detail of trunk subport12. + subport13, should succeed; + 7. user12/13 tries to remove subport13 from trunk11. should fail; + 8. user11 tries to remove subport13 from trunk11. should succeed; + 9. user12/13 tries to update/delete 'trunk11', should fail; + 10. user11 tries to update/delete trunk11, should succeed; + 11. user11 tries to update/delete 'subport12', 'subport13', should + succeed; + """ + + print ("\nTC-19") + + self.set_connections_for_user(self.user11) + + ip_addr = "10.10.20.0" + for network_name, subnet_name in [("network11", "subnet11"), + ("network12", "subnet12"), + ("network13", "subnet13")]: + network = self._create_network(network_name) + assert network is not None + self._create_subnet(subnet_name, network_name, + cidr=ip_addr+"/24", + gateway_ip=str(netaddr.IPAddress(ip_addr) + 1)) + assert "subnet11" in [s.name for s in self._list_subnets()] + ip_addr = str(netaddr.IPAddress(ip_addr) + 256) + + for network_name, port_name in [("network11", "parentport11"), + ("network12", "subport12"), + ("network13", "subport13")]: + port = self._create_port(port_name, network_name) + assert port is not None + + subport12 = self._get_port('subport12') + subport13 = self._get_port('subport13') + + trunk11 = self._create_trunk("trunk11", "parentport11", + sub_ports=[{ + "port_id": subport12.id, + "segmentation_id": 11, + "segmentation_type": "vlan" + }]) + assert trunk11 is not None + + trunk = self._get_trunk("trunk11") + self._add_trunk_subport("trunk11", "subport13", 12, "vlan") + subports = self._get_trunk_subports("trunk11") + assert subport13.id in [s.get('port_id') for s in subports] + + for user in [self.user11, self.user12, self.user13]: + self.set_connections_for_user(user) + + trunks = self._list_trunks() + assert "trunk11" in [t.name for t in trunks] + + trunk = self._get_trunk("trunk11") + assert trunk is not None + + subports = self._get_trunk_subports("trunk11") + for subport_name in ["subport12", "subport13"]: + subport = self._get_port(subport_name) + assert subport is not None + assert subport.id in [s.get('port_id') for s in subports] + + subport13 = self._get_port("subport13") + for user in [self.user12, self.user13]: + self.set_connections_for_user(user) + with pytest.raises(exceptions.HttpException) as err: + assert self._remove_trunk_subport("trunk11", "subport13") + assert err.match("HttpException: 403") + subports = self._get_trunk_subports("trunk11") + assert subport13.id in [s.get('port_id') for s in subports] + + self.set_connections_for_user(self.user11) + self._remove_trunk_subport("trunk11", "subport13") + subports = self._get_trunk_subports("trunk11") + assert subport13.id not in [s.get('port_id') for s in subports] + + for user in [self.user12, self.user13]: + self.set_connections_for_user(user) + new_descr = "Trunk " + user.get("name") + with pytest.raises(exceptions.HttpException) as err: + assert self._update_trunk("trunk11", + description=new_descr) + assert err.match("HttpException: 403") + + with pytest.raises(exceptions.HttpException) as err: + assert self._delete_trunk("trunk11") + assert err.match("HttpException: 403") + + self.set_connections_for_user(self.user11) + new_descr = "Trunk user11" + self._update_trunk("trunk11", description=new_descr) + trunk = self._get_trunk("trunk11") + assert trunk.description == new_descr + self._delete_trunk("trunk11") + assert trunk.name not in [t.name for t in self._list_trunks()] + + self.set_connections_for_user(self.user11) + for port_name in ["subport12", "subport13"]: + new_descr = user.get("name") + " " + port_name + self._update_port(port_name, description=new_descr) + port = self._get_port(port_name) + assert port.description == new_descr + + self._delete_port(port_name) + assert self._find_port(port_name) is None + + def test_uc_trunk_2(self, tc_teardown): + """ + 1. user02 creates networks and subnets: + 'network21'/'subnet21','network22'/'subnet22','network23'/'subnet23'; + 2. user02 creates parent-port 'parentport21' over 'network21', creates + subport 'subport22' over 'network22', creates subport 'subport23' + over 'network23'; + 3. user02 creates trunk 'trunk21' with 'parentport21' and 'subport22'; + 4. user11/12/13 tries to get list of trunk and subports creaetd in + step2 and setp3. should fail; + """ + + print ("\nTC-20") + + self.set_connections_for_user(self.user02) + + ip_addr = "10.10.20.0" + for network_name, subnet_name in [("network21", "subnet21"), + ("network22", "subnet22"), + ("network23", "subnet23")]: + network = self._create_network(network_name) + assert network is not None + subnet = self._create_subnet(subnet_name, network_name, + cidr=ip_addr+"/24", + gateway_ip=str(netaddr.IPAddress(ip_addr) + 1)) + assert subnet is not None + ip_addr = str(netaddr.IPAddress(ip_addr) + 256) + + for network_name, port_name in [("network21", "parentport21"), + ("network22", "subport22"), + ("network23", "subport23")]: + port = self._create_port(port_name, network_name) + assert port is not None + + subport22 = self._get_port('subport22') + + trunk21 = self._create_trunk("trunk21", "parentport21", + sub_ports=[{ + "port_id": subport22.id, + "segmentation_id": 11, + "segmentation_type": "vlan" + }]) + assert trunk21 is not None + + for user in [self.user11, self.user12, self.user13]: + self.set_connections_for_user(user) + assert "trunk21" not in [t.name for t in self._list_trunks()] + assert self._find_trunk("trunk21") is None + with pytest.raises(exceptions.NotFoundException): + assert not self._get_trunk_subports("trunk21") + + def test_uc_rbacpolicy_1(self, tc_teardown): + """ + 1. user11 creates network and subnet: 'network11' and 'subnet11'; + 2. user11 tries to create rbac-policy with type: network, should + succeed; + 3. user11/12/13 tries to get list and detail of rbac-poliy cretaed in + step2. should succeed; + 4. user12/13 tries to update/delete rback-policy created in step2. + should fail; + 5. user11 tries to update/delete rback-policy created in step2. should + succeed; + """ + + print ("\nTC-21") + + self.set_connections_for_user(self.user11) + + network = self._create_network("network11") + assert network is not None + + subnet = self._create_subnet("subnet11", "network11", + enable_dhcp=False, + cidr="192.168.195.0/24", + gateway_ip="192.168.195.1") + assert subnet is not None + + rbac_policy = self._create_rbac_policy( + action="access_as_shared", + network_id=network.id, + target_tenant='project1') + assert rbac_policy is not None + + for user in [self.user11, self.user12, self.user13]: + self.set_connections_for_user(user) + assert rbac_policy.id in [rp.id for rp in self._list_rbac_policies()] + rp = self._get_rbac_policy(rbac_policy.id) + assert rp is not None + + for user in [self.user12, self.user13]: + self.set_connections_for_user(user) + + with pytest.raises(exceptions.HttpException) as err: + assert self._update_rbac_policy(rbac_policy.id, + target_tenant="project2") + assert err.match("HttpException: 403") + + with pytest.raises(exceptions.HttpException) as err: + assert self._delete_rbac_policy(rbac_policy.id) + assert err.match("HttpException: 403") + + self.set_connections_for_user(self.user11) + self._update_rbac_policy(rbac_policy.id, target_tenant="project2") + rbac_policy = self._get_rbac_policy(rbac_policy.id) + # NOTE(myumi): target_project_id receives the target_tenant on + # openstacksdk (why?) + assert rbac_policy.target_project_id == "project2" + self._delete_rbac_policy(rbac_policy.id) + assert self._find_rbac_policy(rbac_policy.id) is None + + def test_uc_rbacpolicy_2(self, tc_teardown): + """ + 1. user02 creates network and subnet: 'network21', 'subnet21'; + 2. user02 creates rbac-policy with type: network; + 3. user11/12/13 tries to get list and detail of rback created in step2. + should fail; + """ + + print ("\nTC-22") + + self.set_connections_for_user(self.user02) + + network = self._create_network("network21") + assert network is not None + + self._create_subnet("subnet21", "network21", enable_dhcp=False, + cidr="192.168.195.0/24", gateway_ip="192.168.195.1") + assert "subnet21" in [s.name for s in self._list_subnets()] + + rbac_policy = self._create_rbac_policy( + action="access_as_shared", + network_id=network.id, + target_tenant='project2') + assert rbac_policy is not None + + for user in [self.user11, self.user12, self.user13]: + self.set_connections_for_user(user) + rbac_policies = self._list_rbac_policies() + assert rbac_policy.id not in [rp.id for rp in rbac_policies] + assert self._find_rbac_policy(rbac_policy.id) is None + + def test_uc_sg_1(self, tc_teardown): + """ + 1. user11 tries to create security-group 'sg11', should succeed; + 2. user11 tries to create securit-group-rule 'sgrule11' for 'sg11', + should succeed; + 3. user11/12/13 tries to get list and detail of security-groups: sg11. + should succeed; + 4. user11/12/13 tries to get list and detail of security-group-rule + created in step2. should succeed; + 5. user12/13 tries to delete security-group-rule created in step2. should + fail; + 6. user11 tries to delete security-group-rule created in step2. should + succeed; + 7. user12/13 tries to update/delete security-group-rule in step1. should + fail; + 8. user11 tries to update/delete security-group in step1. should succeed; + """ + + print ("\nTC-23") + + self.set_connections_for_user(self.user11) + + security_group = self._create_security_group(name="sg11") + assert security_group is not None + + rule = self._create_security_group_rule( + "sg11", + direction="ingress", + protocol="icmp", + ethertype="IPv4") + assert rule is not None + + for user in [self.user11, self.user12, self.user13]: + self.set_connections_for_user(user) + + assert self._get_security_group("sg11") is not None + assert "sg11" in [sg.name for sg in self._list_security_groups()] + + assert self._get_security_group_rule(rule.id) is not None + rules = self._list_security_group_rules(security_group.id) + assert "icmp" in [r.protocol for r in rules] + + for user in [self.user12, self.user13]: + self.set_connections_for_user(user) + + with pytest.raises(exceptions.HttpException) as err: + assert self._delete_security_group_rule(rule.id) + err.match("HttpException: 403") + + self.set_connections_for_user(self.user11) + self._delete_security_group_rule(rule.id) + rules = self._list_security_group_rules(security_group.id) + assert "icmp" not in [r.protocol for r in rules] + + for user in [self.user12, self.user13]: + self.set_connections_for_user(user) + + new_descr = user.get("name") + "'s security group" + with pytest.raises(exceptions.HttpException) as err: + assert self._update_security_group("sg11", description=new_descr) + assert err.match("HttpException: 403") + + with pytest.raises(exceptions.HttpException) as err: + self._delete_security_group("sg11") + assert err.match("HttpException: 403") + + self.set_connections_for_user(self.user11) + new_descr = user.get("name") + "'s security group" + self._update_security_group("sg11", description=new_descr) + sg = self._get_security_group("sg11") + assert sg.description == new_descr + + self._delete_security_group("sg11") + assert self._find_security_group("sg11") is None + + def test_uc_sg_2(self, tc_teardown): + """ + 1. user02 tries to create security-group 'sg21', should succeed; + 2. user02 tries to create securit-group-rule 'sgrule21' for 'sg11', + should succeed; + 3. user11/12/13 tries to get list and detail of security-groups: sg21. + should fail; + 4. user11/12/13 tries to get list and detail of security-group-rules + created in step2. should fail; + """ + + print ("\nTC-24") + + self.set_connections_for_user(self.user02) + + self._create_security_group(name="sg21") + security_group = self._get_security_group("sg21") + assert security_group is not None + + rule = self._create_security_group_rule( + "sg21", + protocol="icmp", + direction="ingress", + ethertype="IPv4") + assert rule is not None + + for user in [self.user11, self.user12, self.user13]: + self.set_connections_for_user(user) + + assert "sg21" not in [s.name for s in self._list_security_groups()] + assert self._find_security_group("sg21") is None + + rules = self._list_security_group_rules(security_group.id) + assert "icmp" not in [r.protocol for r in rules] + with pytest.raises(exceptions.ResourceNotFound): + assert self._get_security_group_rule(rule.id) is None