A Tempest plugin to test Neutron VMware NSX plugin.
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 

1410 lines
67 KiB

# Copyright 2018 VMware Inc
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import os
import re
import subprocess
import time
from oslo_log import log as logging
from tempest import config
from tempest.lib.common.utils import data_utils
from tempest.lib.common.utils import test_utils
from tempest.lib import decorators
from tempest.lib import exceptions
from tempest import test
from vmware_nsx_tempest_plugin.common import constants
from vmware_nsx_tempest_plugin.lib import feature_manager
from vmware_nsx_tempest_plugin.services import nsxv_client
CONF = config.CONF
CONF.validation.auth_method = 'None'
LOG = logging.getLogger(__name__)
class TestFwaasV2Ops(feature_manager.FeatureManager):
@classmethod
def skip_checks(cls):
super(TestFwaasV2Ops, cls).skip_checks()
if not test.is_extension_enabled('fwaas_v2', 'network'):
msg = "Extension fwaas_v2 is not enabled."
raise cls.skipException(msg)
@classmethod
def setup_clients(cls):
super(TestFwaasV2Ops, cls).setup_clients()
cls.cmgr_adm = cls.get_client_manager('admin')
cls.rtr_client = cls.cmgr_adm.routers_client
cls.nw_client = cls.cmgr_adm.networks_client
cls.subnets_client = cls.cmgr_adm.subnets_client
cls.ports_client = cls.cmgr_adm.ports_client
cls.sec_rule_client = cls.cmgr_adm.security_group_rules_client
cls.sec_client = cls.cmgr_adm.security_groups_client
@classmethod
def resource_setup(cls):
super(TestFwaasV2Ops, cls).resource_setup()
if CONF.network.backend == "nsxv":
manager_ip = re.search(r"(\d{1,3}\.){3}\d{1,3}",
CONF.nsxv.manager_uri).group(0)
cls.vsm = nsxv_client.VSMClient(
manager_ip, CONF.nsxv.user, CONF.nsxv.password)
cls.public_network_cidr = CONF.network.public_network_cidr
def get_router_port(self, client, router_id):
"""List ports using admin creds """
device_owner = u'network:router_interface'
ports_list = client.list_ports(device_id=router_id,
device_owner=device_owner)
for port in ports_list['ports']:
port_info = client.show_port(port['id'])
return port_info['port']['id']
return None
def _create_router(self, router_name=None, admin_state_up=True,
external_network_id=None, enable_snat=None,
set_gateway=True, **kwargs):
ext_gw_info = {}
if external_network_id:
ext_gw_info['network_id'] = external_network_id
if enable_snat is not None:
ext_gw_info['enable_snat'] = enable_snat
if set_gateway:
body = self.routers_client.create_router(
name=router_name, external_gateway_info=ext_gw_info,
admin_state_up=admin_state_up, **kwargs)
else:
body = self.routers_client.create_router(
name=router_name,
admin_state_up=admin_state_up,
**kwargs)
router = body.get('router', body)
self.addCleanup(self._delete_router, router)
return router
def _delete_router(self, router):
body = self.ports_client.list_ports(device_id=router['id'])
interfaces = body['ports']
for i in interfaces:
test_utils.call_and_ignore_notfound_exc(
self.routers_client.remove_router_interface, router['id'],
subnet_id=i['fixed_ips'][0]['subnet_id'])
self.routers_client.delete_router(router['id'])
def _test_ping_from_external_network(self, fip_ip):
out = os.popen('ping -c 2 %s' % fip_ip).read().strip()
return out
def _add_static_route(self, vm1_real_ip, router):
r_ips = router['external_gateway_info']['external_fixed_ips']
r_ip = r_ips[0]['ip_address']
os.popen('route add -host %s gw %s' % (vm1_real_ip, r_ip))
def _test_tcp_port_from_external_network(self, fip_ip):
nc_cmd = subprocess.Popen(['nc -zvw01 192.166.1.2 22'],
stdout=subprocess.PIPE,
stderr=subprocess.PIPE, shell=True)
output1, output2 = nc_cmd.communicate()
return output2
def _check_firewall_rule_exists_at_backend(self, rules,
firewall_rule_name):
rule_not_created = False
fw_rule_name = [r[1] for r in firewall_rule_name]
rules = [r['name'] for r in rules]
for fw_rule in fw_rule_name:
fw_rule = 'Fwaas-' + fw_rule[:24]
if fw_rule in rules:
rule_created = True
else:
rule_not_created = True
if rule_not_created:
return False
if rule_created:
return True
def verify_connectivity_to_extvm(self, server1, traffic_result):
ip_address = server1['floating_ips'][0]['floating_ip_address']
ssh_src1 = self._get_remote_client(ip_address, username='cirros',
use_password=True)
ext_cidr = CONF.network.public_network_cidr.rsplit('.', 1)[0] + '.13'
ext_vm_ip = ext_cidr.rsplit('.', 1)[0] + '.13'
self.check_remote_connectivity(ssh_src1, ext_vm_ip,
should_succeed=traffic_result)
def _create_rules(self, rule_list):
fw_rules = []
for rule in rule_list:
protocol = rule['proto']
rule_name = data_utils.rand_name('fw-rule-' + str(protocol))
# Create firewall rule
kwargs = {'action': rule['action']}
if 'src' in rule.keys():
kwargs['source_ip_address'] = rule['src']
if 'dest' in rule.keys():
kwargs['destination_ip_address'] = rule['dest']
if 'src_port' in rule.keys():
kwargs['source_port'] = rule['src_port']
if 'dest_port' in rule.keys():
kwargs['destination_port'] = rule['dest_port']
if 'ip_version' in rule.keys():
kwargs['ip_version'] = rule['ip_version']
fw_rule = self.create_firewall_rule(
name=rule_name, protocol=protocol, project_id=self.project_id,
**kwargs)
self.addCleanup(test_utils.call_and_ignore_notfound_exc,
self.fwaas_v2_client.delete_firewall_v2_rule,
fw_rule['firewall_rule']['id'])
fw_rules.append((fw_rule, rule_name, protocol))
rules = []
for fw_rule in fw_rules:
show_rules = \
self.show_firewall_rule(fw_rule[0]['firewall_rule']['id'])
# Check firewall rule
self.assertEqual(show_rules.get('firewall_rule')['name'],
fw_rule[1])
self.assertEqual(show_rules.get('firewall_rule')['protocol'],
fw_rule[2])
rules.append((fw_rule[0]['firewall_rule']['id'], fw_rule[1]))
return rules
def _create_policy(self, name, rules):
rules = [r[0] for r in rules]
fw_policy = self.create_firewall_policy(name=name,
firewall_rules=rules,
project_id=self.project_id)
self.addCleanup(test_utils.call_and_ignore_notfound_exc,
self.fwaas_v2_client.delete_firewall_v2_policy,
fw_policy['firewall_policy']['id'])
return fw_policy['firewall_policy']['id']
def _verify_backend_rules(self, router_name, in_egress_rules,
expected=True):
edges = self.vsm.get_all_edges()
for edge in edges:
if router_name in edge['name']:
if edge['edgeType'] == 'gatewayServices':
edge_id = edge['id']
break
rules = self.vsm.get_edge_firewall_rules(edge_id)
fw_exist = self._check_firewall_rule_exists_at_backend(rules,
in_egress_rules)
self.assertEqual(expected, fw_exist)
def create_fw_basic_topo_port_per_protocol(self, ports=None, router=None,
ingr=None, egr=None):
# process ingress & egress parameters
if ingr['enable']:
ingress = True
in_rules = ingr['rules']
else:
ingress = False
ingress_policy_id = None
ingress_rules = None
if egr['enable']:
egress = True
egr_rules = egr['rules']
else:
egress = False
egress_policy_id = None
egress_rules = None
# Create rules, policies
if ingress and egress:
ingress_rules = self._create_rules(in_rules)
egress_rules = self._create_rules(egr_rules)
policy_name = data_utils.rand_name('fw-policy-ingr-')
ingress_policy_id = self._create_policy(policy_name, ingress_rules)
policy_name = data_utils.rand_name('fw-policy-egr-')
egress_policy_id = self._create_policy(policy_name, egress_rules)
elif ingress:
ingress_rules = self._create_rules(in_rules)
policy_name = data_utils.rand_name('fw-policy-ingr-')
ingress_policy_id = self._create_policy(policy_name, ingress_rules)
elif egress:
egress_rules = self._create_rules(egr_rules)
policy_name = data_utils.rand_name('fw-policy-egr-')
egress_policy_id = self._create_policy(policy_name, egress_rules)
if ports is None:
ports = []
group_name = data_utils.rand_name('fw-group-')
# Create firewall group
fw_group = self.create_firewall_group(
name=group_name,
ingress_firewall_policy_id=ingress_policy_id,
egress_firewall_policy_id=egress_policy_id,
ports=ports,
project_id=self.project_id)
self.addCleanup(test_utils.call_and_ignore_notfound_exc,
self.fwaas_v2_client.delete_firewall_v2_group,
fw_group["firewall_group"]["id"])
time.sleep(constants.NSX_BACKEND_SMALL_TIME_INTERVAL)
if router:
if router['distributed'] or router['router_type'] == 'exclusive':
self._wait_firewall_ready(fw_group["firewall_group"]["id"])
show_group = \
self.show_firewall_group(fw_group["firewall_group"]["id"])
status = show_group.get('firewall_group')['status']
if (not ingress and not egress) or not ports:
self.assertEqual(status, 'INACTIVE')
else:
self.assertEqual(status, 'ACTIVE')
else: # this will check for shared router
show_group = \
self.show_firewall_group(fw_group["firewall_group"]["id"])
self.assertEqual(show_group.get('firewall_group')['status'],
'ERROR')
ingr = show_group.get('firewall_group')['ingress_firewall_policy_id']
egr = show_group.get('firewall_group')['egress_firewall_policy_id']
self.assertEqual(ingr, ingress_policy_id)
self.assertEqual(egr, egress_policy_id)
self.assertEqual(show_group.get('firewall_group')['ports'], ports)
if router['external_gateway_info']:
set_gw = True
else:
set_gw = False
if self.verify_on_backend:
if ports and set_gw:
if (ingress and egress):
self._verify_backend_rules(router['name'], ingress_rules)
self._verify_backend_rules(router['name'], egress_rules)
elif ingress:
self._verify_backend_rules(router['name'], ingress_rules)
elif egress:
self._verify_backend_rules(router['name'], egress_rules)
else:
self._verify_backend_rules(router['name'],
ingress_rules, False)
self._verify_backend_rules(router['name'],
egress_rules, False)
fw_topo = dict(ingress_rules=ingress_rules, egress_rules=egress_rules,
fw_ingr_policy=ingress_policy_id,
fw_egr_policy=egress_policy_id,
fw_group_id=fw_group["firewall_group"]["id"])
return fw_topo
def create_network_topo_with_two_rtr(self, router_type='exclusive',
create_instance=False,
enable_snat=True):
network = \
self.create_topology_network(network_name="fw-network",
networks_client=self.networks_client)
# Create router topo
if router_type == 'exclusive':
kwargs = {"router_type": "exclusive",
"admin_state_up": "True"}
self.verify_on_backend = True
elif router_type == 'distributed':
kwargs = {"distributed": "True",
"admin_state_up": "True"}
self.verify_on_backend = True
else:
kwargs = {"admin_state_up": "True"}
self.verify_on_backend = False
router1 = self._create_router(
router_name=data_utils.rand_name('router-default1'),
external_network_id=CONF.network.public_network_id,
enable_snat=enable_snat,
**kwargs)
router2 = self._create_router(
router_name=data_utils.rand_name('router-default2'),
external_network_id=CONF.network.public_network_id,
enable_snat=enable_snat,
**kwargs)
subnet_name = 'fw-subnet'
self.create_topology_subnet(subnet_name, network,
router_id=router1['id'])
self.create_topology_subnet(subnet_name, network,
router_id=router2['id'])
if create_instance:
vm1 = self.create_topology_instance(
"state_vm_1", [network],
create_floating_ip=enable_snat)
if enable_snat:
ips = self.topology_servers['state_vm_1']['floating_ips']
real_ip = ips[0]['fixed_ip_address']
float_ip = ips[0]['floating_ip_address']
else:
ips = self.topology_servers['state_vm_1']['addresses']
real_ip = ips[network['name']][0]['addr']
float_ip = None
else:
real_ip = None
float_ip = None
vm1 = None
p_client = self.ports_client
port1 = []
port1.append(self.get_router_port(p_client, router1['id']))
port2 = []
port2.append(self.get_router_port(p_client, router2['id']))
# verify backend verification
net_topo = dict(project=network['project_id'], router1=router1,
router2=router2, port1=port1, port2=port2,
vm1_real_ip=real_ip, vm1_float_ip=float_ip,
server=vm1)
return net_topo
def create_network_topo(self, router_type='exclusive',
create_instance=False, enable_snat=True,
set_gateway=True):
network = \
self.create_topology_network(network_name="fw-network",
networks_client=self.networks_client)
# Create router topo
if router_type == 'exclusive':
kwargs = {"router_type": "exclusive",
"admin_state_up": "True"}
self.verify_on_backend = True
elif router_type == 'distributed':
kwargs = {"distributed": "True",
"admin_state_up": "True"}
self.verify_on_backend = True
else:
kwargs = {"admin_state_up": "True"}
self.verify_on_backend = False
router = self._create_router(
router_name=data_utils.rand_name('router-default1'),
external_network_id=CONF.network.public_network_id,
enable_snat=enable_snat, set_gateway=set_gateway,
**kwargs)
subnet_name = 'fw-subnet'
self.create_topology_subnet(subnet_name, network,
router_id=router['id'])
if create_instance:
if set_gateway and enable_snat:
floating_ip = True
else:
floating_ip = False
vm1 = self.create_topology_instance(
"state_vm_1", [network],
create_floating_ip=floating_ip)
if set_gateway and enable_snat:
ips = self.topology_servers['state_vm_1']['floating_ips']
real_ip = ips[0]['fixed_ip_address']
float_ip = ips[0]['floating_ip_address']
else:
ips = self.topology_servers['state_vm_1']['addresses']
real_ip = ips[network['name']][0]['addr']
float_ip = None
else:
real_ip = None
float_ip = None
vm1 = None
p_client = self.ports_client
ports = []
ports.append(self.get_router_port(p_client, router['id']))
# verify backend verification
net_topo = dict(project=network['project_id'], router=router,
ports=ports, vm1_real_ip=real_ip,
vm1_float_ip=float_ip, server=vm1)
return net_topo
@decorators.attr(type='nsxv')
@decorators.idempotent_id('131288d7-9213-4b1e-b22d-15840c8e3f13')
def test_fwaas_group_attach_to_multiple_ports_of_router(self):
"""
Test fwaasv2 group attach to multiple ports of one router
"""
net_topo = self.create_network_topo(router_type='exclusive',
create_instance=True)
# Create another network and subnet and attach to router in net_topo
network2 = \
self.create_topology_network(network_name="fw-network2",
networks_client=self.networks_client)
router_id = net_topo['router']['id']
self.create_topology_subnet('subnet2', network2,
router_id=router_id,
networks_client=self.networks_client)
device_owner = u'network:router_interface'
ports_list = self.ports_client.list_ports(device_id=router_id,
device_owner=device_owner)
ports = []
for port in ports_list['ports']:
port_info = self.ports_client.show_port(port['id'])
ports.append(port_info['port']['id'])
self.project_id = net_topo['project']
ingr = {'enable': True,
'rules': [{'proto': 'icmp', 'action': 'allow'},
{'proto': 'tcp', 'action': 'allow'}
]}
egr = {'enable': True,
'rules': [{'proto': 'icmp', 'action': 'allow'},
{'proto': 'tcp', 'action': 'allow'}
]}
self.create_fw_basic_topo_port_per_protocol(ports,
net_topo['router'],
ingr, egr)
# verify int-VM to int-VM ping works
self.verify_connectivity_to_extvm(net_topo['server'],
traffic_result=True)
# verify ext-VM to intVM floating IP ping
out = self._test_ping_from_external_network(net_topo['vm1_float_ip'])
self.assertIn("2 received", str(out))
@decorators.attr(type='nsxv')
@decorators.idempotent_id('131288d7-9213-4b1e-a11d-15840c8e3f13')
def test_fwaas_in_egr_allow_tcp_icmp_default_ip_address(self):
"""
Test fwaasv2 api to create icmp/tcp rule with default ip as blankports
and check ingress/egress traffic.
"""
net_topo = self.create_network_topo(router_type='exclusive',
create_instance=True)
self.project_id = net_topo['project']
ports = net_topo['ports']
ingr = {'enable': True,
'rules': [{'proto': 'icmp', 'action': 'allow'},
{'proto': 'tcp', 'action': 'allow'}
]}
egr = {'enable': True,
'rules': [{'proto': 'icmp', 'action': 'allow'},
{'proto': 'tcp', 'action': 'allow'}
]}
fw_topo = \
self.create_fw_basic_topo_port_per_protocol(ports,
net_topo['router'],
ingr, egr)
# verify int-VM to int-VM ping works
self.verify_connectivity_to_extvm(net_topo['server'],
traffic_result=True)
# verify ext-VM to intVM floating IP ping
out = self._test_ping_from_external_network(net_topo['vm1_float_ip'])
self.assertIn("2 received", str(out))
# remove port from firewall group
self.fwaas_v2_client.update_firewall_v2_group(fw_topo['fw_group_id'],
ports=[])
self._wait_firewall_ready(fw_topo['fw_group_id'])
@decorators.attr(type='nsxv')
@decorators.idempotent_id('131288d7-9213-4b1e-a11d-15840c8e4f13')
def test_fwaas_in_egr_allow_tcp_icmp_ip_address_0_0_0_0(self):
"""
Test fwaasv2 api to create icmp/tcp rule with default ip as blankports
and check ingress/egress traffic.
"""
net_topo = self.create_network_topo(router_type='exclusive',
create_instance=True)
self.project_id = net_topo['project']
ports = net_topo['ports']
ingr = {'enable': True,
'rules': [{'proto': 'icmp', 'src': '0.0.0.0/0',
'dest': '0.0.0.0/0', 'action': 'allow'},
{'proto': 'tcp', 'src': '0.0.0.0/0',
'dest': '0.0.0.0/0', 'action': 'allow'}
]}
egr = {'enable': True,
'rules': [{'proto': 'icmp', 'src': '0.0.0.0/0',
'dest': '0.0.0.0/0', 'action': 'allow'},
{'proto': 'tcp', 'src': '0.0.0.0/0',
'dest': '0.0.0.0/0', 'action': 'allow'}
]}
fw_topo = \
self.create_fw_basic_topo_port_per_protocol(ports,
net_topo['router'],
ingr, egr)
# verify int-VM to int-VM ping works
self.verify_connectivity_to_extvm(net_topo['server'],
traffic_result=True)
# verify ext-VM to intVM floating IP ping
out = self._test_ping_from_external_network(net_topo['vm1_float_ip'])
self.assertIn("2 received", str(out))
# remove port from firewall group
self.fwaas_v2_client.update_firewall_v2_group(fw_topo['fw_group_id'],
ports=[])
self._wait_firewall_ready(fw_topo['fw_group_id'])
@decorators.attr(type='nsxv')
@decorators.idempotent_id('131288d7-9213-4b1e-a11d-15840c8e5f13')
def test_fwaas_in_egr_deny_tcp_icmp_ip_address_0_0_0_0(self):
"""
Test fwaasv2 api to create icmp/tcp rule with ip 0.0.0.0/0
and check ingress/egress traffic.
To check in to extVM traffic, we have to ssh to ingress rules
has tcp allowed
"""
net_topo = self.create_network_topo(router_type='exclusive',
create_instance=True)
self.project_id = net_topo['project']
ports = net_topo['ports']
ingr = {'enable': True,
'rules': [{'proto': 'icmp', 'src': '0.0.0.0/0',
'dest': '0.0.0.0/0', 'action': 'deny'},
{'proto': 'tcp', 'src': '0.0.0.0/0',
'dest': '0.0.0.0/0', 'action': 'allow'}
]}
egr = {'enable': True,
'rules': [{'proto': 'icmp', 'src': '0.0.0.0/0',
'dest': '0.0.0.0/0', 'action': 'deny'},
{'proto': 'tcp', 'src': '0.0.0.0/0',
'dest': '0.0.0.0/0', 'action': 'deny'}
]}
fw_topo = \
self.create_fw_basic_topo_port_per_protocol(ports,
net_topo['router'],
ingr, egr)
# verify int-VM to int-VM ping fails
self.verify_connectivity_to_extvm(net_topo['server'],
traffic_result=False)
# verify ext-VM to intVM floating IP ping fails
out = self._test_ping_from_external_network(net_topo['vm1_float_ip'])
self.assertIn("0 received", str(out))
# remove port from firewall group
self.fwaas_v2_client.update_firewall_v2_group(fw_topo['fw_group_id'],
ports=[])
self._wait_firewall_ready(fw_topo['fw_group_id'])
@decorators.attr(type='nsxv')
@decorators.idempotent_id('531388d7-9213-4b1e-a11d-15840c8e5f13')
def test_fwaas_ingress_only_allow_tcp_icmp_ip_address_0_0_0_0(self):
"""
Test fwaasv2 api to create icmp/tcp rule with ip 0.0.0.0/0
and check ingress/egress traffic.
To check in to extVM traffic, we have to ssh to ingress rules
has tcp allowed
ingress traffic should & egress should fail
"""
net_topo = self.create_network_topo(router_type='exclusive',
create_instance=True)
self.project_id = net_topo['project']
ports = net_topo['ports']
ingr = {'enable': True,
'rules': [{'proto': 'icmp', 'src': '0.0.0.0/0',
'dest': '0.0.0.0/0', 'action': 'allow'},
{'proto': 'tcp', 'src': '0.0.0.0/0',
'dest': '0.0.0.0/0', 'action': 'allow'}
]}
egr = {'enable': False,
'rules': [{'proto': 'icmp', 'src': '0.0.0.0/0',
'dest': '0.0.0.0/0', 'action': 'deny'},
{'proto': 'tcp', 'src': '0.0.0.0/0',
'dest': '0.0.0.0/0', 'action': 'deny'}
]}
fw_topo = \
self.create_fw_basic_topo_port_per_protocol(ports,
net_topo['router'],
ingr, egr)
# verify int-VM to int-VM ping fails
self.verify_connectivity_to_extvm(net_topo['server'],
traffic_result=False)
# verify ext-VM to intVM floating IP ping works
out = self._test_ping_from_external_network(net_topo['vm1_float_ip'])
self.assertIn("2 received", str(out))
# remove port from firewall group
self.fwaas_v2_client.update_firewall_v2_group(fw_topo['fw_group_id'],
ports=[])
self._wait_firewall_ready(fw_topo['fw_group_id'])
@decorators.attr(type='nsxv')
@decorators.idempotent_id('131488d7-9213-4b1e-a11d-15840c8e5f13')
def test_fwaas_egress_only_allow_tcp_icmp_ip_address_0_0_0_0(self):
"""
Test fwaasv2 api to create icmp/tcp rule with ip 0.0.0.0/0
with only egress.
ingress traffic should fail & cant check egress as egress as
ingress is stopped so could not ssh to internal VM
"""
net_topo = self.create_network_topo(router_type='exclusive',
create_instance=True)
self.project_id = net_topo['project']
ports = net_topo['ports']
ingr = {'enable': False,
'rules': [{'proto': 'icmp', 'src': '0.0.0.0/0',
'dest': '0.0.0.0/0', 'action': 'allow'},
{'proto': 'tcp', 'src': '0.0.0.0/0',
'dest': '0.0.0.0/0', 'action': 'allow'}
]}
egr = {'enable': True,
'rules': [{'proto': 'icmp', 'src': '0.0.0.0/0',
'dest': '0.0.0.0/0', 'action': 'allow'},
{'proto': 'tcp', 'src': '0.0.0.0/0',
'dest': '0.0.0.0/0', 'action': 'allow'}
]}
fw_topo = \
self.create_fw_basic_topo_port_per_protocol(ports,
net_topo['router'],
ingr, egr)
# verify ext-VM to intVM floating IP ping works
out = self._test_ping_from_external_network(net_topo['vm1_float_ip'])
self.assertIn("0 received", str(out))
# remove port from firewall group
self.fwaas_v2_client.update_firewall_v2_group(fw_topo['fw_group_id'],
ports=[])
self._wait_firewall_ready(fw_topo['fw_group_id'])
@decorators.attr(type='nsxv')
@decorators.idempotent_id('931488d7-9213-4b1e-a11d-15840c8e5f13')
def test_fwaas_ingress_egress_allow_tcp_icmp_subnet(self):
"""
Test fwaasv2 api to create icmp/tcp rule with ip real ip
egress traffic 123.0.0.0/24 -> 192.167.1.0/24 allow,
SNAT(123.0.0.0/24 192.167.1.10)->Egress
egress rulle should 0.0.0.0/0->192.167.1.0/24 allow
ingress traffic 192.167.1.13->192.167.1.100(fip),here egress->DNAT
192.167.1.0/24 -> 192.167.1.0/24 allow OR
0.0.0.0/0 -> 192.167.1.0/24 allow
ingress & egress traffic should work
"""
net_topo = self.create_network_topo(router_type='exclusive',
create_instance=True)
self.project_id = net_topo['project']
ports = net_topo['ports']
ingr = {'enable': True,
'rules': [{'proto': 'icmp', 'src': '0.0.0.0/0',
'dest': self.public_network_cidr,
'action': 'allow'},
{'proto': 'tcp', 'src': '0.0.0.0/0',
'dest': '0.0.0.0/0', 'action': 'allow'}
]}
egr = {'enable': True,
'rules': [{'proto': 'icmp', 'src': self.public_network_cidr,
'dest': '192.167.1.0/24', 'action': 'allow'}
]}
fw_topo = \
self.create_fw_basic_topo_port_per_protocol(ports,
net_topo['router'],
ingr, egr)
# verify int-VM to int-VM ping fails
self.verify_connectivity_to_extvm(net_topo['server'],
traffic_result=True)
# verify ext-VM to intVM floating IP ping works
out = self._test_ping_from_external_network(net_topo['vm1_float_ip'])
self.assertIn("2 received", str(out))
# remove port from firewall group
self.fwaas_v2_client.update_firewall_v2_group(fw_topo['fw_group_id'],
ports=[])
self._wait_firewall_ready(fw_topo['fw_group_id'])
@decorators.attr(type='nsxv')
@decorators.idempotent_id('931488d7-9213-4b1e-a11d-15840c8e5f14')
def test_fwaas_ingress_egress_allow_tcp_icmp_exactIP(self):
"""
Test fwaasv2 api to create icmp/tcp rule with exact IP
vm1(123.0.0.123/24)->extvm(192.167.1.13)
fip 192.167.1.123
egress rule 123.0.0.123 -> 192.167.1.13 allow,
ingress rule 192.167.1.13 -> 192.167.1.123(FIP)
ingress & egress traffic should work
"""
net_topo = self.create_network_topo(router_type='exclusive',
create_instance=True)
self.project_id = net_topo['project']
ports = net_topo['ports']
ext_ip = self.public_network_cidr.rsplit('.', 1)[0] + '.13'
ingr = {'enable': True,
'rules': [{'proto': 'icmp', 'src': ext_ip,
'dest': net_topo['vm1_float_ip'],
'action': 'allow'},
{'proto': 'tcp', 'src': '0.0.0.0/0',
'dest': '0.0.0.0/0', 'action': 'allow'}
]}
egr = {'enable': True,
'rules': [{'proto': 'icmp', 'src': net_topo['vm1_real_ip'],
'dest': ext_ip, 'action': 'allow'}
]}
fw_topo = \
self.create_fw_basic_topo_port_per_protocol(ports,
net_topo['router'],
ingr, egr)
# verify int-VM to int-VM ping fails
self.verify_connectivity_to_extvm(net_topo['server'],
traffic_result=True)
# verify ext-VM to intVM floating IP ping works
out = self._test_ping_from_external_network(net_topo['vm1_float_ip'])
self.assertIn("2 received", str(out))
# remove port from firewall group
self.fwaas_v2_client.update_firewall_v2_group(fw_topo['fw_group_id'],
ports=[])
self._wait_firewall_ready(fw_topo['fw_group_id'])
@decorators.attr(type='nsxv')
@decorators.idempotent_id('931488d7-9213-4b1e-a11d-15840c8e5f14')
def test_fwaas_ingress_egress_deny_icmp_exactIP(self):
"""
Test fwaasv2 api to create icmp/tcp rule with exact IP
vm1(123.0.0.123/24)->extvm(192.167.1.13)
fip 192.167.1.123
egress rule 123.0.0.123 -> 192.167.1.13 allow,
ingress rule 192.167.1.13 -> 192.167.1.123(FIP)
ingress & egress traffic should work
"""
net_topo = self.create_network_topo(router_type='exclusive',
create_instance=True)
self.project_id = net_topo['project']
ports = net_topo['ports']
ext_ip = self.public_network_cidr.rsplit('.', 1)[0] + '.13'
ingr = {'enable': True,
'rules': [{'proto': 'icmp', 'src': ext_ip,
'dest': net_topo['vm1_float_ip'], 'action': 'deny'},
{'proto': 'tcp', 'src': '0.0.0.0/0',
'dest': '0.0.0.0/0', 'action': 'allow'}
]}
egr = {'enable': True,
'rules': [{'proto': 'icmp', 'src': net_topo['vm1_real_ip'],
'dest':ext_ip, 'action': 'deny'}
]}
fw_topo = \
self.create_fw_basic_topo_port_per_protocol(ports,
net_topo['router'],
ingr, egr)
# verify int-VM to int-VM ping fails
self.verify_connectivity_to_extvm(net_topo['server'],
traffic_result=False)
# verify ext-VM to intVM floating IP ping works
out = self._test_ping_from_external_network(net_topo['vm1_float_ip'])
self.assertIn("0 received", str(out))
# remove port from firewall group
self.fwaas_v2_client.update_firewall_v2_group(fw_topo['fw_group_id'],
ports=[])
self._wait_firewall_ready(fw_topo['fw_group_id'])
@decorators.attr(type='nsxv')
@decorators.idempotent_id('931488d7-9213-4b1e-a11d-15840c8e5f15')
def test_fwaas_ingress_allow_egress_deny_icmp_ipv6_address(self):
"""
Test fwaasv2 api to create icmp/tcp rule with ipv6 address IP
Verify rule get created successfully
"""
net_topo = self.create_network_topo(router_type='exclusive',
create_instance=True)
self.project_id = net_topo['project']
ports = net_topo['ports']
ingr = {'enable': True,
'rules': [{'proto': 'icmp', 'src': '2001:db8:a::123/64',
'dest': '2001:db8:a::124/64', 'action': 'deny',
'ip_version': '6'}
]}
egr = {'enable': True,
'rules': [{'proto': 'icmp', 'src': '2001:db8:a::123/64',
'dest': '2001:db8:a::124/64', 'action': 'allow',
'ip_version': '6'}
]}
fw_topo = \
self.create_fw_basic_topo_port_per_protocol(ports,
net_topo['router'],
ingr, egr)
# remove port from firewall group
self.fwaas_v2_client.update_firewall_v2_group(fw_topo['fw_group_id'],
ports=[])
self._wait_firewall_ready(fw_topo['fw_group_id'])
@decorators.attr(type='nsxv')
@decorators.idempotent_id('931488d7-9213-4b1e-a11d-15840c8e5f14')
def test_fwaas_ingress_egress_allow_icmp_no_snat(self):
"""
Test fwaasv2 api to create icmp/tcp rule with exact IP
vm1(123.0.0.123/24)->extvm(192.167.1.13)
egress rule 123.0.0.123 -> 192.167.1.13 allow, no-snat
ingress rule 192.167.1.13 -> 123.0.0.123, allow, no fip as no snat
have to add static route on ext machine so that return traffic
will reach to router first
route add 123.0.0.0/24 gw routerIP
ingress & egress traffic should work
"""
net_topo = self.create_network_topo(router_type='exclusive',
create_instance=True,
enable_snat=False)
self.project_id = net_topo['project']
ports = net_topo['ports']
ext_ip = self.public_network_cidr.rsplit('.', 1)[0] + '.13'
ingr = {'enable': True,
'rules': [{'proto': 'icmp', 'src': self.public_network_cidr,
'dest': net_topo['vm1_real_ip'], 'action': 'allow'},
{'proto': 'tcp', 'src': '0.0.0.0/0',
'dest': '0.0.0.0/0', 'action': 'allow'}
]}
egr = {'enable': True,
'rules': [{'proto': 'icmp', 'src': net_topo['vm1_real_ip'],
'dest': ext_ip, 'action': 'allow'}
]}
fw_topo = \
self.create_fw_basic_topo_port_per_protocol(ports,
net_topo['router'],
ingr, egr)
# add static route on tempest/ext vm
self._add_static_route(net_topo['vm1_real_ip'], net_topo['router'])
# verify ext-VM to intVM floating IP ping works
out = self._test_ping_from_external_network(net_topo['vm1_real_ip'])
self.assertIn("2 received", str(out))
# remove port from firewall group
self.fwaas_v2_client.update_firewall_v2_group(fw_topo['fw_group_id'],
ports=[])
self._wait_firewall_ready(fw_topo['fw_group_id'])
@decorators.attr(type='nsxv')
@decorators.idempotent_id('b5b188d7-0183-4b1e-9111-15840c8e2fd6')
def test_and_verify_fwaas_basic_udp(self):
"""
Test fwaasv2 api to create udp rule/policy/group and update it and
verifying its values
"""
net_topo = self.create_network_topo(router_type='exclusive',
create_instance=True)
self.project_id = net_topo['project']
ports = net_topo['ports']
ingr = {'enable': True,
'rules': [{'proto': 'udp', 'src': '0.0.0.0/0',
'dest': '0.0.0.0/0', 'action': 'deny'}
]}
egr = {'enable': True,
'rules': [{'proto': 'udp', 'src': '0.0.0.0/0',
'dest': '0.0.0.0/0', 'action': 'deny'}
]}
fw_topo = \
self.create_fw_basic_topo_port_per_protocol(ports,
net_topo['router'],
ingr, egr)
# remove port from firewall group
self.fwaas_v2_client.update_firewall_v2_group(fw_topo['fw_group_id'],
ports=[])
self._wait_firewall_ready(fw_topo['fw_group_id'])
@decorators.attr(type='nsxv')
@decorators.attr(type=["negative"])
@decorators.idempotent_id('534588d7-0183-4b12-a11d-15840c8e2fd6')
def test_delete_fw_group_when_port_in_use(self):
"""
Try to delete firewall group when its in use by port
"""
net_topo = self.create_network_topo(router_type='exclusive',
create_instance=True)
self.project_id = net_topo['project']
ports = net_topo['ports']
ingr = {'enable': True,
'rules': [{'proto': 'icmp', 'src': '0.0.0.0/0',
'dest': '0.0.0.0/0', 'action': 'deny'}
]}
egr = {'enable': True,
'rules': [{'proto': 'icmp', 'src': '0.0.0.0/0',
'dest': '0.0.0.0/0', 'action': 'deny'}
]}
fw_topo = \
self.create_fw_basic_topo_port_per_protocol(ports,
net_topo['router'],
ingr, egr)
# Deleting fw-group which is use by port
self.assertRaises(exceptions.Conflict,
self.fwaas_v2_client.delete_firewall_v2_group,
fw_topo['fw_group_id'])
# remove port from firewall group
self.fwaas_v2_client.update_firewall_v2_group(fw_topo['fw_group_id'],
ports=[])
self._wait_firewall_ready(fw_topo['fw_group_id'])
@decorators.attr(type='nsxv')
@decorators.attr(type=["negative"])
@decorators.idempotent_id('301228d7-0183-4b1e-a11d-35821c8e2fd6')
def test_delete_fw_rule_fw_policy_when_in_use(self):
"""
Try to delete firewall rule when its in use
"""
net_topo = self.create_network_topo(router_type='exclusive',
create_instance=True)
self.project_id = net_topo['project']
ports = net_topo['ports']
ingr = {'enable': True,
'rules': [{'proto': 'icmp', 'src': '0.0.0.0/0',
'dest': '0.0.0.0/0', 'action': 'deny'}
]}
egr = {'enable': True,
'rules': [{'proto': 'icmp', 'src': '0.0.0.0/0',
'dest': '0.0.0.0/0', 'action': 'deny'}
]}
fw_topo = \
self.create_fw_basic_topo_port_per_protocol(ports,
net_topo['router'],
ingr, egr)
# delete rule when used in policy
self.assertRaises(exceptions.Conflict,
self.fwaas_v2_client.delete_firewall_v2_rule,
fw_topo['ingress_rules'][0][0])
# delete policy wehen used in fw-group
self.assertRaises(exceptions.Conflict,
self.fwaas_v2_client.delete_firewall_v2_policy,
fw_topo['fw_ingr_policy'])
# remove port from firewall group
self.fwaas_v2_client.update_firewall_v2_group(fw_topo['fw_group_id'],
ports=[])
self._wait_firewall_ready(fw_topo['fw_group_id'])
@decorators.attr(type='nsxv')
@decorators.idempotent_id('88b188d7-0183-4b1e-a11d-15840c8e2345')
def test_fwaas_icmp_tcp_rule_with_distributed_router(self):
"""
Test fwaasv2 api to create tcp rule/policy/group with distributed
router port and update it and verifying its values
"""
net_topo = self.create_network_topo(router_type='distributed',
create_instance=True)
self.project_id = net_topo['project']
ports = net_topo['ports']
ext_ip = self.public_network_cidr.rsplit('.', 1)[0] + '.13'
ingr = {'enable': True,
'rules': [{'proto': 'icmp', 'src': ext_ip,
'dest': net_topo['vm1_float_ip'],
'action': 'allow'},
{'proto': 'tcp', 'src': '0.0.0.0/0',
'dest': '0.0.0.0/0', 'action': 'allow'}
]}
egr = {'enable': True,
'rules': [{'proto': 'icmp', 'src': net_topo['vm1_real_ip'],
'dest':ext_ip, 'action': 'allow'}
]}
fw_topo = \
self.create_fw_basic_topo_port_per_protocol(ports,
net_topo['router'],
ingr, egr)
# verify int-VM to int-VM ping fails
self.verify_connectivity_to_extvm(net_topo['server'],
traffic_result=True)
# verify ext-VM to intVM floating IP ping works
out = self._test_ping_from_external_network(net_topo['vm1_float_ip'])
self.assertIn("2 received", str(out))
# remove port from firewall group
self.fwaas_v2_client.update_firewall_v2_group(fw_topo['fw_group_id'],
ports=[])
self._wait_firewall_ready(fw_topo['fw_group_id'])
@decorators.attr(type='nsxv')
@decorators.idempotent_id('91b188d7-0183-4b1e-a11d-15840c8e2345')
def test_fwaas_router_port_icmp_rule_with_shared_router_should_fail(self):
"""
Test fwaasv2 api to create tcp rule/policy/group with shared
router port should fail
"""
net_topo = self.create_network_topo(router_type='shared',
create_instance=True)
self.project_id = net_topo['project']
ports = net_topo['ports']
ext_ip = self.public_network_cidr.rsplit('.', 1)[0] + '.13'
ingr = {'enable': True,
'rules': [{'proto': 'icmp', 'src': ext_ip,
'dest': net_topo['vm1_float_ip'],
'action': 'allow'},
{'proto': 'tcp', 'src': '0.0.0.0/0',
'dest': '0.0.0.0/0', 'action': 'allow'}
]}
egr = {'enable': True,
'rules': [{'proto': 'icmp', 'src': net_topo['vm1_real_ip'],
'dest':ext_ip, 'action': 'allow'}
]}
fw_topo = \
self.create_fw_basic_topo_port_per_protocol(ports,
net_topo['router'],
ingr, egr)
# Remove port from firewall group
self.fwaas_v2_client.update_firewall_v2_group(fw_topo['fw_group_id'],
ports=[])
self._wait_firewall_ready(fw_topo['fw_group_id'])
@decorators.attr(type='nsxv')
@decorators.idempotent_id('91b189d8-0183-4b1e-a11d-15840c8e2345')
def test_fwaas_group_wihtout_policy_added_to_router_port(self):
"""
Test fwaasv2 api to create fw group without any policy and
attach router port to it, should be INACTIVE
"""
net_topo = self.create_network_topo(router_type='exclusive',
create_instance=False)
self.project_id = net_topo['project']
ports = net_topo['ports']
ingr = {'enable': False}
egr = {'enable': False}
fw_topo = \
self.create_fw_basic_topo_port_per_protocol(ports,
net_topo['router'],
ingr, egr)
# Remove port from firewall group
self.fwaas_v2_client.update_firewall_v2_group(fw_topo['fw_group_id'],
ports=[])
self._wait_firewall_ready(fw_topo['fw_group_id'])
@decorators.attr(type='nsxv')
@decorators.idempotent_id('91b189d8-0283-4b1e-a11d-15840c8e2345')
def test_fwaas_group_with_policy_without_apply_to_port(self):
"""
Test fwaasv2 api to create fw group with any policy and
does not attach router port to it, fw should be INACTIVE
"""
net_topo = self.create_network_topo(router_type='exclusive',
create_instance=False)
self.project_id = net_topo['project']
ports = []
ingr = {'enable': True,
'rules': [{'proto': 'icmp', 'src': '0.0.0.0/0',
'dest': '0.0.0.0/0', 'action': 'deny'}
]}
egr = {'enable': True,
'rules': [{'proto': 'icmp', 'src': '0.0.0.0/0',
'dest': '0.0.0.0/0', 'action': 'deny'}
]}
fw_topo = \
self.create_fw_basic_topo_port_per_protocol(ports,
net_topo['router'],
ingr, egr)
# Remove port from firewall group
self.fwaas_v2_client.update_firewall_v2_group(fw_topo['fw_group_id'],
ports=[])
self._wait_firewall_ready(fw_topo['fw_group_id'])
@decorators.attr(type='nsxv')
@decorators.idempotent_id('91b189d8-0283-4b1e-a11d-25840c8e2345')
def test_fwaas_group_with_policy_wit_source_any_destination_ssh_port(self):
"""
Test fwaasv2 api to create fw group with any policy & rule with ports
attach router port to it, fw should be ACTIVE and check ingress
ssh traffic
"""
net_topo = self.create_network_topo(router_type='exclusive',
create_instance=True)
ports = net_topo['ports']
self.project_id = net_topo['project']
ingr = {'enable': True,
'rules': [{'proto': 'tcp', 'src_port': '1:65535',
'dest_port': '22', 'action': 'allow'}
]}
egr = {'enable': True,
'rules': [{'proto': 'tcp', 'src_port': '1:65535',
'dest_port': '22', 'action': 'allow'}
]}
fw_topo = \
self.create_fw_basic_topo_port_per_protocol(ports,
net_topo['router'],
ingr, egr)
# verify ext-VM to intVM tcp 22 works
out = \
self._test_tcp_port_from_external_network(net_topo['vm1_float_ip'])
self.assertIn("succeeded", str(out))
# Remove port from firewall group
self.fwaas_v2_client.update_firewall_v2_group(fw_topo['fw_group_id'],
ports=[])
self._wait_firewall_ready(fw_topo['fw_group_id'])
@decorators.attr(type='nsxv')
@decorators.idempotent_id('91b189d8-0283-4b1e-a11d-25841c8e2345')
def test_fwaas_group_with_policy_with_gateway_disabled(self):
"""
Test fwaasv2 api to create fw group with any policy & rule with ports
attach router port to it, but gateway is not set so rule should push
to backend
"""
net_topo = self.create_network_topo(router_type='exclusive',
create_instance=True,
set_gateway=False)
ports = net_topo['ports']
self.project_id = net_topo['project']
ingr = {'enable': True,
'rules': [{'proto': 'tcp', 'src': '0.0.0.0/0',
'src_port': '1:65535', 'dest': '0.0.0.0/0',
'dest_port': '22', 'action': 'allow'}
]}
egr = {'enable': True,
'rules': [{'proto': 'tcp', 'src_port': '1:65535',
'dest_port': '22', 'action': 'allow'}
]}
fw_topo = \
self.create_fw_basic_topo_port_per_protocol(ports,
net_topo['router'],
ingr, egr)
# Remove port from firewall group
self.fwaas_v2_client.update_firewall_v2_group(fw_topo['fw_group_id'],
ports=[])
self._wait_firewall_ready(fw_topo['fw_group_id'])
@decorators.attr(type='nsxv')
@decorators.idempotent_id('91b189d8-0283-4b1e-a12d-25841c8e2345')
def test_fwaas_group_with_policy_with_address_ports_gateway_enabled(self):
"""
Test fwaasv2 api to create fw group with any policy & rule with ports
attach router port to it, fw should be ACTIVE and check ingress
ssh traffic
"""
net_topo = self.create_network_topo(router_type='exclusive',
create_instance=True,
set_gateway=False)
ports = net_topo['ports']
self.project_id = net_topo['project']
ingr = {'enable': True,
'rules': [{'proto': 'tcp', 'src': '0.0.0.0/0',
'src_port': '1:65535', 'dest': '0.0.0.0/0',
'dest_port': '22', 'action': 'allow'}
]}
egr = {'enable': True,
'rules': [{'proto': 'tcp', 'src_port': '1:65535',
'dest_port': '22', 'action': 'allow'}
]}
fw_topo = \
self.create_fw_basic_topo_port_per_protocol(ports,
net_topo['router'],
ingr, egr)
# verify ext-VM to intVM tcp 22 works
out = \
self._test_tcp_port_from_external_network(net_topo['vm1_float_ip'])
self.assertIn("succeeded", str(out))
# Remove port from firewall group
self.fwaas_v2_client.update_firewall_v2_group(fw_topo['fw_group_id'],
ports=[])
self._wait_firewall_ready(fw_topo['fw_group_id'])
@decorators.attr(type='nsxv')
@decorators.idempotent_id('91b189d8-0283-4b1e-a11d-25840c8e2345')
def test_fwaas_group_remove_and_attach_to_router2(self):
"""
Test fwaasv2 api to create two fw group with two router port should be
active.
Remove firewall from one router port and attach to another router port
firewall group should be active
"""
topo = self.create_network_topo_with_two_rtr(router_type='exclusive',
create_instance=True)
port1 = topo['port1']
port2 = topo['port2']
self.project_id = topo['project']
ingr = {'enable': True,
'rules': [{'proto': 'tcp', 'src': '0.0.0.0/0',
'dest': '0.0.0.0/0', 'action': 'allow'}
]}
egr = {'enable': True,
'rules': [{'proto': 'tcp', 'src': '0.0.0.0/0',
'dest': '0.0.0.0/0', 'action': 'allow'}
]}
fw_topo1 = \
self.create_fw_basic_topo_port_per_protocol(port1,
topo['router1'],
ingr, egr)
fw_topo2 = \
self.create_fw_basic_topo_port_per_protocol(port2,
topo['router2'],
ingr, egr)
self.fwaas_v2_client.update_firewall_v2_group(fw_topo1['fw_group_id'],
ports=[])
self.fwaas_v2_client.update_firewall_v2_group(fw_topo2['fw_group_id'],
ports=port1)
self._wait_firewall_ready(fw_topo2['fw_group_id'])
# Verifying if fw2 is ACTIVE with port1 & fw1 in INACTIVE without port
show_group = \
self.show_firewall_group(fw_topo1['fw_group_id'])
self.assertEqual(show_group.get('firewall_group')['status'],
'INACTIVE')
show_group = \
self.show_firewall_group(fw_topo2['fw_group_id'])
self.assertEqual(show_group.get('firewall_group')['status'],
'ACTIVE')
self.assertEqual(show_group.get('firewall_group')['ports'],
port1)
self.fwaas_v2_client.update_firewall_v2_group(fw_topo2['fw_group_id'],
ports=[])
self._wait_firewall_ready(fw_topo2['fw_group_id'])
@decorators.attr(type='nsxv')
@decorators.attr(type=["negative"])
@decorators.idempotent_id('91b189d8-1483-4b1e-a11d-25840c8e2345')
def test_fwaas_group_attach_port_to_two_fw_group_should_fail(self):
"""
Test fwaasv2 api to create two fw group and attach router port should
be active. Try to attach port to another firewall group should fail
"""
topo = self.create_network_topo(router_type='exclusive',
create_instance=True)
port1 = topo['ports']
self.project_id = topo['project']
ingr = {'enable': True,
'rules': [{'proto': 'tcp', 'src': '0.0.0.0/0',
'dest': '0.0.0.0/0', 'action': 'allow'}
]}
egr = {'enable': True,
'rules': [{'proto': 'tcp', 'src': '0.0.0.0/0',
'dest': '0.0.0.0/0', 'action': 'allow'}
]}
fw_topo1 = \
self.create_fw_basic_topo_port_per_protocol(port1,
topo['router'],
ingr, egr)
port2 = []
fw_topo2 = \
self.create_fw_basic_topo_port_per_protocol(port2,
topo['router'],
ingr, egr)
self.assertRaises(exceptions.Conflict,
self.fwaas_v2_client.update_firewall_v2_group,
fw_topo2['fw_group_id'],
ports=port1)
self.fwaas_v2_client.update_firewall_v2_group(fw_topo1['fw_group_id'],
ports=[])
self._wait_firewall_ready(fw_topo1['fw_group_id'])
@decorators.attr(type='nsxv')
@decorators.idempotent_id('91b289d8-4583-4b1e-a11d-25840c8e2345')
def test_fwaas_group_attach_to_two_different_port(self):
"""
Test fwaasv2 api to create a fw group and attach to two different ports
"""
topo = self.create_network_topo_with_two_rtr(router_type='exclusive',
create_instance=True)
ports = [topo['port1'][0], topo['port2'][0]]
self.project_id = topo['project']
ingr = {'enable': True,
'rules': [{'proto': 'tcp', 'src': '0.0.0.0/0',
'dest': '0.0.0.0/0', 'action': 'allow'}
]}
egr = {'enable': True,
'rules': [{'proto': 'tcp', 'src': '0.0.0.0/0',
'dest': '0.0.0.0/0', 'action': 'allow'}
]}
fw_topo = \
self.create_fw_basic_topo_port_per_protocol(ports, topo['router1'],
ingr, egr)
self.fwaas_v2_client.update_firewall_v2_group(fw_topo['fw_group_id'],
ports=[])
self._wait_firewall_ready(fw_topo['fw_group_id'])
@decorators.attr(type='nsxv')
@decorators.idempotent_id('91b199d8-4583-4b1e-a11d-25840c8e2345')
def test_fwaas_rule_in_multiple_policies_one_policy_in_multiple_fwg(self):
"""
Test fwaasv2 api to create a fw rule and use in multiple policies.
"""
topo = self.create_network_topo(router_type='exclusive',
create_instance=True)
ports = topo['ports']
self.project_id = topo['project']
rules = [{'proto': 'tcp', 'src': '0.0.0.0/0', 'dest': '0.0.0.0/0',
'action': 'allow'}]
# create_rule
rules = self._create_rules(rules)
# create policy1 with above rule
policy1 = self._create_policy('policy1', rules)
# create policy2 with above rule
policy2 = self._create_policy('policy2', rules)
# verify policy1 & policy2 has same rule
show_policy1 = self.show_firewall_policy(policy1)
show_policy2 = self.show_firewall_policy(policy2)
self.assertEqual(
show_policy1.get('firewall_policy')['name'],
'policy1')
self.assertEqual(
show_policy2.get('firewall_policy')['name'],
'policy2')
self.assertEqual(show_policy1.get('firewall_policy')
['firewall_rules'], [rules[0][0]])
self.assertEqual(show_policy2.get('firewall_policy')
['firewall_rules'], [rules[0][0]])
# create fw1 & fw2
ports = []
fw_group1 = self.create_firewall_group(
name='fw-grp1',
ingress_firewall_policy_id=policy1,
egress_firewall_policy_id=policy1,
ports=ports,
project_id=self.project_id)
self.addCleanup(test_utils.call_and_ignore_notfound_exc,
self.fwaas_v2_client.delete_firewall_v2_group,
fw_group1["firewall_group"]["id"])
fw_group2 = self.create_firewall_group(
name='fw-grp2',
ingress_firewall_policy_id=policy1,
egress_firewall_policy_id=policy1,
ports=ports,
project_id=self.project_id)
self.addCleanup(test_utils.call_and_ignore_notfound_exc,
self.fwaas_v2_client.delete_firewall_v2_group,
fw_group2["firewall_group"]["id"])
# verify fw_group1 & fw_group1 has same policies configured
show_grp1 = self.show_firewall_group(fw_group1["firewall_group"]["id"])
show_grp2 = self.show_firewall_group(fw_group2["firewall_group"]["id"])
self.assertEqual(show_grp1.get('firewall_group')[
'ingress_firewall_policy_id'], policy1)
self.assertEqual(show_grp1.get('firewall_group')[
'egress_firewall_policy_id'], policy1)
self.assertEqual(show_grp2.get('firewall_group')[
'ingress_firewall_policy_id'], policy1)
self.assertEqual(show_grp2.get('firewall_group')[
'egress_firewall_policy_id'], policy1)
@decorators.attr(type='nsxv')
@decorators.idempotent_id('4a0306e5-663c-4981-8177-e8a255a8859c')
def test_CRUD_on_firewall(self):
"""
Test fwaasv2 api to create a fw rule and use in multiple policies.
"""
topo = self.create_network_topo(router_type='exclusive',
create_instance=True)
ports = topo['ports']
self.project_id = topo['project']
rules = [{'proto': 'icmp', 'src': '0.0.0.0/0', 'dest': '0.0.0.0/0',
'action': 'allow'}]
# create_rule
rules = self._create_rules(rules)
# Update rule & verify
rule_id = rules[0][0]
self.update_firewall_rule(rule_id, protocol='tcp')
self.update_firewall_rule(rule_id, name='new-rule')
show_rules = self.show_firewall_rule(rule_id)
# Check firewall rule after updated value
self.assertEqual(show_rules.get('firewall_rule')['name'], 'new-rule')
self.assertEqual(show_rules.get('firewall_rule')['protocol'], 'tcp')
self.assertEqual(show_rules.get('firewall_rule')['action'], 'allow')
self.assertEqual(show_rules.get('firewall_rule')['source_ip_address'],
'0.0.0.0/0')
# create policy1 with above rule
policy1 = self._create_policy('policy1', rules)
# verify policy
show_policy1 = self.show_firewall_policy(policy1)
self.assertEqual(
show_policy1.get('firewall_policy')['name'],
'policy1')
self.assertEqual(show_policy1.get('firewall_policy')
['firewall_rules'], [rules[0][0]])
# Update policy
self.update_firewall_policy(policy1, name='new-policy')
show_policy = self.show_firewall_policy(policy1)
# Check firewall policy after updated
self.assertEqual(
show_policy.get('firewall_policy')['name'],
'new-policy')
# create firewall group
fw_group1 = self.create_firewall_group(
name='fw-grp1',
ingress_firewall_policy_id=policy1,
egress_firewall_policy_id=policy1,
ports=ports,
project_id=self.project_id)
fw_grp1_id = fw_group1["firewall_group"]["id"]
self.addCleanup(test_utils.call_and_ignore_notfound_exc,
self.fwaas_v2_client.delete_firewall_v2_group,
fw_group1["firewall_group"]["id"])
self._wait_firewall_ready(fw_grp1_id)
# verify fw_group1
show_grp1 = self.show_firewall_group(fw_grp1_id)
self.assertEqual(show_grp1.get('firewall_group')[
'ingress_firewall_policy_id'], policy1)
self.assertEqual(show_grp1.get('firewall_group')[
'egress_firewall_policy_id'], policy1)
# Update firewall group
self.update_firewall_group(fw_grp1_id,
name='new-group')
show_group = self.show_firewall_group(fw_grp1_id)
# Check firewall updated group
self.assertEqual(
show_group.get('firewall_group')['name'],
'new-group')
self.assertEqual(show_group.get('firewall_group')[
'ingress_firewall_policy_id'], policy1)
self.fwaas_v2_client.update_firewall_v2_group(fw_grp1_id,
ports=[])
self._wait_firewall_ready(fw_grp1_id)