vmware-nsx-tempest-plugin/vmware_nsx_tempest_plugin/tests/api/test_v2_fwaas.py

359 lines
15 KiB
Python

# Copyright 2018 VMware Inc
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import time
from oslo_log import log as logging
from tempest import config
from tempest.lib.common.utils import data_utils
from tempest.lib.common.utils import test_utils
from tempest.lib import decorators
from tempest.lib import exceptions
from tempest import test
from vmware_nsx_tempest_plugin.common import constants
from vmware_nsx_tempest_plugin.lib import feature_manager
from vmware_nsx_tempest_plugin.services import nsx_client
CONF = config.CONF
CONF.validation.auth_method = 'None'
LOG = logging.getLogger(__name__)
class TestFwaasV2Ops(feature_manager.FeatureManager):
@classmethod
def skip_checks(cls):
super(TestFwaasV2Ops, cls).skip_checks()
if not test.is_extension_enabled('fwaas_v2', 'network'):
msg = "Extension fwaas_v2 is not enabled."
raise cls.skipException(msg)
@classmethod
def setup_credentials(cls):
cls.set_network_resources()
cls.admin_mgr = cls.get_client_manager('admin')
super(TestFwaasV2Ops, cls).setup_credentials()
@classmethod
def setup_clients(cls):
"""
Create various client connections. Such as NSX.
"""
super(TestFwaasV2Ops, cls).setup_clients()
cls.nsx_client = nsx_client.NSXClient(
CONF.network.backend,
CONF.nsxv3.nsx_manager,
CONF.nsxv3.nsx_user,
CONF.nsxv3.nsx_password)
def create_fw_basic_topo(self, protocol_name=None, source_ip=None):
if protocol_name is None:
protocol_name = 'icmp'
rule_name = data_utils.rand_name('fw-rule-')
# Create firewall rule
fw_rules = self.create_firewall_rule(name=rule_name,
protocol=protocol_name,
source_ip_address=source_ip)
rules = []
show_rules = self.show_firewall_rule(fw_rules['firewall_rule']['id'])
# Check firewall rule
self.assertEqual(show_rules.get('firewall_rule')['name'], rule_name)
self.assertEqual(show_rules.get('firewall_rule')['protocol'],
protocol_name)
# Update firewall rule
self.update_firewall_rule(
fw_rules['firewall_rule']['id'],
protocol='tcp')
self.update_firewall_rule(
fw_rules['firewall_rule']['id'],
name='new-rule')
show_rules = self.show_firewall_rule(fw_rules['firewall_rule']['id'])
# Check firewall rule after updated value
self.assertEqual(show_rules.get('firewall_rule')['name'], 'new-rule')
self.assertEqual(show_rules.get('firewall_rule')['protocol'],
'tcp')
rules.append(fw_rules['firewall_rule']['id'])
policy_name = data_utils.rand_name('fw-policy-')
# Create firewall policy
fw_policy = self.create_firewall_policy(name=policy_name,
firewall_rules=rules)
show_policy = self.show_firewall_policy(
fw_policy['firewall_policy']['id'])
# Check firewall policy
self.assertEqual(
show_policy.get('firewall_policy')['name'],
policy_name)
self.assertEqual(show_policy.get('firewall_policy')
['firewall_rules'], rules)
# Update firewall policy
self.update_firewall_policy(fw_policy['firewall_policy']['id'],
name='new-policy')
show_policy = self.show_firewall_policy(
fw_policy['firewall_policy']['id'])
# Check firewall policy
self.assertEqual(
show_policy.get('firewall_policy')['name'],
'new-policy')
policy_id = fw_policy['firewall_policy']['id']
group_name = data_utils.rand_name('fw-group-')
# Create firewall group
fw_group = self.create_firewall_group(
name=group_name,
ingress_firewall_policy_id=policy_id,
egress_firewall_policy_id=policy_id)
show_group = self.show_firewall_group(fw_group["firewall_group"]["id"])
# Check firewall group values
self.assertEqual(show_group.get('firewall_group')['name'], group_name)
self.assertEqual(show_group.get('firewall_group')[
'ingress_firewall_policy_id'], policy_id)
self.assertEqual(show_group.get('firewall_group')[
'egress_firewall_policy_id'], policy_id)
fw_topo = dict(fw_rules=fw_rules, fw_policy=fw_policy,
fw_group=fw_group)
return fw_topo
def create_fw_group_port_topo(
self,
group_delete=True,
project_id=None,
ports=None,
protocol_name=None,
source_ip=None):
if protocol_name is None:
protocol_name = 'icmp'
rule_name = data_utils.rand_name('fw-rule-')
# Create firewall rule
fw_rules = self.create_firewall_rule(
name=rule_name, protocol=protocol_name, project_id=project_id,
source_ip_address=source_ip)
self.addCleanup(test_utils.call_and_ignore_notfound_exc,
self.fwaas_v2_client.delete_firewall_v2_rule,
fw_rules['firewall_rule']['id'])
rules = []
show_rules = self.show_firewall_rule(fw_rules['firewall_rule']['id'])
# Check firewall rule
self.assertEqual(show_rules.get('firewall_rule')['name'], rule_name)
self.assertEqual(show_rules.get('firewall_rule')['protocol'],
protocol_name)
rules.append(fw_rules['firewall_rule']['id'])
policy_name = data_utils.rand_name('fw-policy-')
# Create firewall policy
fw_policy = self.create_firewall_policy(name=policy_name,
firewall_rules=rules,
project_id=project_id)
self.addCleanup(test_utils.call_and_ignore_notfound_exc,
self.fwaas_v2_client.delete_firewall_v2_policy,
fw_policy['firewall_policy']['id'])
show_policy = self.show_firewall_policy(
fw_policy['firewall_policy']['id'])
# Check firewall policy
self.assertEqual(
show_policy.get('firewall_policy')['name'],
policy_name)
self.assertEqual(show_policy.get('firewall_policy')
['firewall_rules'], rules)
policy_id = fw_policy['firewall_policy']['id']
group_name = data_utils.rand_name('fw-group-')
# Create firewall group
fw_group = self.create_firewall_group(
name=group_name,
ingress_firewall_policy_id=policy_id,
egress_firewall_policy_id=policy_id,
ports=ports,
project_id=project_id)
self.addCleanup(test_utils.call_and_ignore_notfound_exc,
self.fwaas_v2_client.delete_firewall_v2_group,
fw_group["firewall_group"]["id"])
self._wait_firewall_ready(fw_group["firewall_group"]["id"])
show_group = self.show_firewall_group(fw_group["firewall_group"]["id"])
self.assertEqual(show_group.get('firewall_group')['ports'], ports)
fw_group = show_group
if group_delete is True:
# Update firewall group
self.update_firewall_group(fw_group["firewall_group"]["id"],
ports=[])
time.sleep(constants.NSXP_BACKEND_SMALL_TIME_INTERVAL)
# Check updated values of firewall group
self.assertEqual(
show_group.get('firewall_group')['name'],
group_name)
self.assertEqual(show_group.get('firewall_group')[
'ingress_firewall_policy_id'], policy_id)
self.assertEqual(show_group.get('firewall_group')[
'egress_firewall_policy_id'], policy_id)
fw_topo = dict(fw_rules=fw_rules, fw_policy=fw_policy,
fw_group=fw_group)
# Delete firewall group
f_id = fw_group["firewall_group"]["id"]
self.fwaas_v2_client.update_firewall_v2_group(f_id,
ports=[])
self.fwaas_v2_client.delete_firewall_v2_group(
fw_group["firewall_group"]["id"])
else:
fw_topo = dict(fw_rules=fw_rules, fw_policy=fw_policy,
fw_group=fw_group)
return fw_topo
def create_fw_with_port_topology(self, protocol_name,
group_delete=True,
source_ip=None,
create_instance=False):
# Create network topo
network = self.create_topology_network(network_name="fw-network")
router_name = 'fw-router'
# Create router topo
router = self.create_topology_router(router_name)
subnet_name = 'fw-subnet'
# Create subnet topo
self.create_topology_subnet(subnet_name, network,
router_id=router['id'])
if create_instance:
image_id = self.get_glance_image_id(["cirros", "esx"])
self.create_topology_instance(
"state_vm_1", [network],
create_floating_ip=True, image_id=image_id)
self.create_topology_instance(
"state_vm_2", [network],
create_floating_ip=True, image_id=image_id)
floatin_ip = self.topology_servers['state_vm_1']['floating_ips']
source_ip = floatin_ip[0]['fixed_ip_address']
p_client = self.ports_client
ports = []
ports.append(self.get_router_port(p_client))
if not group_delete:
fw_topo = self.create_fw_group_port_topo(
group_delete, network['project_id'], ports, protocol_name,
source_ip)
else:
fw_topo = self.create_fw_group_port_topo(
group_delete, network['project_id'], ports, protocol_name)
return fw_topo
@decorators.attr(type='nsxv3')
@decorators.idempotent_id('431288d7-9213-4b1e-a11d-15840c8e2f12')
def test_fwaas_basic_icmp(self):
"""
Test fwaasv2 api to create icmp rule/policy/group and update it and
verifying its values
"""
self.create_fw_basic_topo('icmp')
@decorators.attr(type='nsxv3')
@decorators.idempotent_id('76a188d7-9812-5b1e-a11d-65840c9e2fd6')
def test_fwaas_basic_tcp(self):
"""
Test fwaasv2 api to create tcp rule/policy/group and update it and
verifying its values
"""
self.create_fw_basic_topo('tcp')
@decorators.attr(type='nsxv3')
@decorators.idempotent_id('a5b188d7-0183-4b1e-9111-15840c8e2fd6')
def test_fwaas_basic_udp(self):
"""
Test fwaasv2 api to create udp rule/policy/group and update it and
verifying its values
"""
self.create_fw_basic_topo('udp')
@decorators.attr(type='nsxv3')
@decorators.idempotent_id('95b188d7-0183-4b1e-a11d-15840c8e2345')
def test_fwaas_router_port_icmp(self):
"""
Test fwaasv2 api to create icmp rule/policy/group with router port and
update it and verifying its values
"""
self.create_fw_with_port_topology('icmp')
@decorators.attr(type='nsxv3')
@decorators.idempotent_id('25b188d7-0183-4b1e-5123-15840c8e2fd6')
def test_fwaas_router_port_tcp(self):
"""
Test fwaasv2 api to create tcp rule/policy/group with router port and
update it and verifying its values
"""
self.create_fw_with_port_topology('tcp')
@decorators.attr(type='nsxv3')
@decorators.idempotent_id('501288d7-0183-4b1e-a11d-15840c8e2fd6')
def test_fwaas_router_port_udp(self):
"""
Test fwaasv2 api to create udp rule/policy/group with router port and
update it and verifying its values
"""
self.create_fw_with_port_topology('udp')
@decorators.attr(type='nsxv3')
@decorators.attr(type=["negative"])
@decorators.idempotent_id('434588d7-0183-4b12-a11d-15840c8e2fd6')
def test_delete_fw_group_when_port_in_use(self):
"""
Try to delete firewall group when its in use
"""
fw_topo = self.create_fw_with_port_topology(
group_delete=False, protocol_name='icmp')
self.assertRaises(exceptions.Conflict,
self.fwaas_v2_client.delete_firewall_v2_group,
fw_topo["fw_group"]["firewall_group"]["id"])
self.update_firewall_group(fw_topo["fw_group"]["firewall_group"]["id"],
ports=[])
self.fwaas_v2_client.delete_firewall_v2_group(
fw_topo["fw_group"]["firewall_group"]["id"])
@decorators.attr(type='nsxv3')
@decorators.attr(type=["negative"])
@decorators.idempotent_id('201228d7-0183-4b1e-a11d-35821c8e2fd6')
def test_delete_fw_rule_when_in_use(self):
"""
Try to delete firewall rule when its in use
"""
fw_topo = self.create_fw_basic_topo('icmp')
self.assertRaises(exceptions.Conflict,
self.fwaas_v2_client.delete_firewall_v2_rule,
fw_topo["fw_rules"]["firewall_rule"]["id"])
@decorators.attr(type='nsxv3')
@decorators.attr(type=["negative"])
@decorators.idempotent_id('901488d7-1184-4b1e-511d-15878c8e2fd6')
def test_delete_fw_policy_when_in_use(self):
"""
Try to delete firewall policy when its in use
"""
fw_topo = self.create_fw_basic_topo('icmp')
self.assertRaises(exceptions.Conflict,
self.fwaas_v2_client.delete_firewall_v2_policy,
fw_topo["fw_policy"]["firewall_policy"]["id"])
@decorators.attr(type='nsxv3')
@decorators.attr(type=["negative"])
@decorators.idempotent_id('901488d7-1184-4b1e-511d-15878c8e2fd6')
def test_verify_firewall_group_source_ip_invalid(self):
"""
Verify Firewall group with invalid ip should be ACTIVE
"""
cidr = '0.0.0.0/0'
fw_topo = self.create_fw_with_port_topology('icmp',
source_ip=cidr,
group_delete=False)
self.assertEqual("ACTIVE",
fw_topo['fw_group']['firewall_group']['status'])
self.update_firewall_group(fw_topo["fw_group"]['firewall_group']["id"],
ports=[])