Add fullstack tests for stateless security groups

Blueprint: stateless-security-groups

Change-Id: I0ecb52ef78a2fc264d1d5d29366c56ecf4503bb7
This commit is contained in:
Lina He 2020-02-27 12:13:39 +00:00 committed by lina He
parent bfc4ffa389
commit 2a36ea26cc
2 changed files with 142 additions and 5 deletions

View File

@ -317,14 +317,18 @@ class ClientFixture(fixtures.Fixture):
}
return self.client.trunk_remove_subports(trunk_id, spec)
def create_security_group(self, tenant_id, name=None):
def create_security_group(self, tenant_id, name=None, stateful=True):
resource_type = 'security_group'
name = name or utils.get_rand_name(prefix=resource_type)
spec = {'tenant_id': tenant_id, 'name': name}
spec = {'tenant_id': tenant_id, 'name': name, 'stateful': stateful}
return self._create_resource(resource_type, spec)
def update_security_group(self, security_group_id, **kwargs):
return self._update_resource('security_group', security_group_id,
kwargs)
def create_security_group_rule(self, tenant_id, security_group_id,
**kwargs):
resource_type = 'security_group_rule'

View File

@ -16,6 +16,8 @@ from neutron_lib import constants
from neutronclient.common import exceptions as nc_exc
from oslo_utils import uuidutils
from neutron.agent.linux import iptables_firewall
from neutron.agent.linux import iptables_manager
from neutron.cmd.sanity import checks
from neutron.common import utils as common_utils
from neutron.tests.common import net_helpers
@ -27,6 +29,10 @@ from neutron.tests.unit import testlib_api
load_tests = testlib_api.module_load_tests
class StatelessRulesNotConfiguredException(Exception):
pass
class OVSVersionChecker(object):
conntrack_supported = None
@ -85,7 +91,7 @@ class TestSecurityGroupsSameNetwork(BaseSecurityGroupsSameNetworkTest):
network_type = 'vxlan'
scenarios = [
# TODO(njohnston): Re-add the linuxbridge scenario once it is # stable
# TODO(njohnston): Re-add the linuxbridge scenario once it is stable
# The iptables_hybrid driver lacks isolation between agents and
# because of that using only one host is enough
('ovs-hybrid', {
@ -117,6 +123,8 @@ class TestSecurityGroupsSameNetwork(BaseSecurityGroupsSameNetworkTest):
10. test two vms with same mac on the same host in different
networks
11. test using multiple security groups
12. test stateless security groups when firewall driver is
iptables or iptables_hybrid.
"""
tenant_uuid = uuidutils.generate_uuid()
@ -300,6 +308,12 @@ class TestSecurityGroupsSameNetwork(BaseSecurityGroupsSameNetworkTest):
# 11. Check using multiple security groups
self._test_using_multiple_security_groups()
# 12. test stateless security groups when firewall driver is iptables
# or iptables_hybrid.
# TODO(njohnston): Re-add the iptables here once it is stable
if self.firewall_driver == 'iptables_hybrid':
self._test_stateless_security_groups()
def _test_using_multiple_security_groups(self):
"""Test using multiple security groups.
@ -395,9 +409,127 @@ class TestSecurityGroupsSameNetwork(BaseSecurityGroupsSameNetworkTest):
self.verify_no_connectivity_between_vms(
vms[1], vms[0], net_helpers.NetcatTester.TCP, 22)
def _test_stateless_security_groups(self):
"""Test stateless security groups.
This test will run basic tests for stateless security groups
1. Check the stateless rules are configured for vm interfaces when
firewall driver is iptables or iptables_hybrid.
2. Check connection is blocked for stateless security group.
3. Check if connection from allowed security group is permitted.
4. Check if traffic not explicitly allowed (eg. ICMP) is blocked.
5. Check if a security group update for entire port range works.
6. Check conflict when adding stateful and stateless security groups
to the same port
"""
tenant_uuid = uuidutils.generate_uuid()
subnet_cidr = '40.0.0.0/24'
vms, ports, sgs, _, _ = self._create_resources(
tenant_uuid, subnet_cidr, stateful=False)
# Apply security groups to the ports
for port in ports[:2]:
self.safe_client.client.update_port(
port['id'],
body={'port': {'port_security_enabled': True,
'security_groups': [sgs[0]['id']]}})
# Check the stateless rules are configured for vm interfaces when
# firewall driver is iptables or iptables_hybrid.
self._validate_stateless_rules(vms[:2])
# Connection is blocked for stateless security group
self.assert_no_connection(
vms[1].namespace, vms[0].namespace, vms[0].ip, 3333,
net_helpers.NetcatTester.TCP)
self.assert_no_connection(
vms[1].namespace, vms[0].namespace, vms[0].ip, 3333,
net_helpers.NetcatTester.TCP, src_port=3344)
rule1 = self.safe_client.create_security_group_rule(
tenant_uuid, sgs[0]['id'],
remote_group_id=sgs[0]['id'], direction='ingress',
ethertype=constants.IPv4,
protocol=constants.PROTO_NAME_TCP,
port_range_min=3344, port_range_max=3344)
# Check if connection from allowed security group is permitted
self.assert_connection(
vms[1].namespace, vms[0].namespace, vms[0].ip, 3333,
net_helpers.NetcatTester.TCP, src_port=3344)
# Check if traffic not explicitly allowed (eg. ICMP) is blocked
vms[0].block_until_no_ping(vms[1].ip)
vms[0].block_until_no_ping(vms[2].ip)
vms[1].block_until_no_ping(vms[2].ip)
# Check if a security group update for entire port range works
self.client.delete_security_group_rule(rule1['id'])
self.assert_no_connection(
vms[1].namespace, vms[0].namespace, vms[0].ip, 3344,
net_helpers.NetcatTester.TCP)
self.assert_no_connection(
vms[1].namespace, vms[0].namespace, vms[0].ip, 3366,
net_helpers.NetcatTester.TCP)
self.safe_client.create_security_group_rule(
tenant_uuid, sgs[0]['id'],
remote_group_id=sgs[0]['id'], direction='ingress',
ethertype=constants.IPv4,
protocol=constants.PROTO_NAME_TCP,
port_range_min=1, port_range_max=65535)
self.assert_connection(
vms[1].namespace, vms[0].namespace, vms[0].ip, 3344,
net_helpers.NetcatTester.TCP)
self.assert_connection(
vms[1].namespace, vms[0].namespace, vms[0].ip, 3366,
net_helpers.NetcatTester.TCP)
# Update sg1 to be stateful
self.safe_client.update_security_group(sgs[1]['id'],
**{'stateful': True})
# Check conflict when adding stateful and stateless security groups
# to the same port
self.assertRaises(nc_exc.Conflict,
self.safe_client.client.update_port,
ports[0]['id'],
body={'port': {
'security_groups': [sgs[0]['id'],
sgs[1]['id']]}})
def _validate_stateless_rules(self, vms):
"""Check if stateless rules from iptables firewall are configured.
Raises StatelessRulesNotConfiguredException exception if no stateless
rules are found.
"""
for vm in vms:
iptables = iptables_manager.IptablesManager(
namespace=vm.host.host_namespace)
vm_tap_device = iptables_firewall.get_hybrid_port_name(
vm.neutron_port['id'])
common_utils.wait_until_true(
lambda: self._is_stateless_configured(iptables,
vm_tap_device),
exception=StatelessRulesNotConfiguredException(
"There are no stateless rules configured for "
"interface %s" % vm_tap_device))
@staticmethod
def _is_stateless_configured(iptables, vm_tap_device):
filter_rules = iptables.get_rules_for_table('raw')
return any((vm_tap_device and '--notrack') in line for
line in filter_rules)
# NOTE: This can be used after refactor other tests to
# one scenario one test.
def _create_resources(self, tenant_uuid, subnet_cidr):
def _create_resources(self, tenant_uuid, subnet_cidr, stateful=True):
if self.firewall_driver == 'iptables_hybrid':
# The iptables_hybrid driver lacks isolation between agents
index_to_host = [0] * 4
@ -408,7 +540,8 @@ class TestSecurityGroupsSameNetwork(BaseSecurityGroupsSameNetworkTest):
self.safe_client.create_subnet(
tenant_uuid, network['id'], subnet_cidr)
sgs = [self.safe_client.create_security_group(tenant_uuid)
sgs = [self.safe_client.create_security_group(tenant_uuid,
stateful=stateful)
for i in range(3)]
ports = [
self.safe_client.create_port(tenant_uuid, network['id'],