fullstack: Add security group tests

Change-Id: Ie1fbc3c253805120f44692e3459a64537b7ae340
This commit is contained in:
IWAMOTO Toshihiro 2016-07-07 16:04:55 +09:00
parent 2ad5208854
commit ff9c6e219b
4 changed files with 191 additions and 3 deletions

View File

@ -89,7 +89,7 @@ class ClientFixture(fixtures.Fixture):
return self._create_resource(resource_type, spec)
def create_port(self, tenant_id, network_id, hostname=None,
qos_policy_id=None, **kwargs):
qos_policy_id=None, security_groups=None, **kwargs):
spec = {
'network_id': network_id,
'tenant_id': tenant_id,
@ -99,6 +99,8 @@ class ClientFixture(fixtures.Fixture):
spec[portbindings.HOST_ID] = hostname
if qos_policy_id:
spec['qos_policy_id'] = qos_policy_id
if security_groups:
spec['security_groups'] = security_groups
return self._create_resource('port', spec)
def create_floatingip(self, tenant_id, floating_network_id,
@ -244,3 +246,21 @@ class ClientFixture(fixtures.Fixture):
'sub_ports': sub_ports,
}
return self.client.trunk_remove_subports(trunk_id, spec)
def create_security_group(self, tenant_id, name=None):
resource_type = 'security_group'
name = name or utils.get_rand_name(prefix=resource_type)
spec = {'tenant_id': tenant_id, 'name': name}
return self._create_resource(resource_type, spec)
def create_security_group_rule(self, tenant_id, security_group_id,
**kwargs):
resource_type = 'security_group_rule'
spec = {'tenant_id': tenant_id,
'security_group_id': security_group_id}
spec.update(kwargs)
return self._create_resource(resource_type, spec)

View File

@ -154,7 +154,7 @@ class OVSConfigFixture(ConfigFixture):
'ovsdb_interface': host_desc.ovsdb_interface,
},
'securitygroup': {
'firewall_driver': 'noop',
'firewall_driver': host_desc.firewall_driver,
},
'agent': {
'l2_population': str(self.env_desc.l2_pop),

View File

@ -55,11 +55,13 @@ class HostDescription(object):
"""
def __init__(self, l3_agent=False, of_interface='ovs-ofctl',
ovsdb_interface='vsctl',
l2_agent_type=constants.AGENT_TYPE_OVS):
l2_agent_type=constants.AGENT_TYPE_OVS,
firewall_driver='noop'):
self.l2_agent_type = l2_agent_type
self.l3_agent = l3_agent
self.of_interface = of_interface
self.ovsdb_interface = ovsdb_interface
self.firewall_driver = firewall_driver
class Host(fixtures.Fixture):

View File

@ -0,0 +1,166 @@
# Copyright 2015 Red Hat, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from neutron_lib import constants
from oslo_utils import uuidutils
from neutron.cmd.sanity import checks
from neutron.tests.common import net_helpers
from neutron.tests.fullstack import base
from neutron.tests.fullstack.resources import environment
from neutron.tests.fullstack.resources import machine
from neutron.tests.unit import testlib_api
load_tests = testlib_api.module_load_tests
class OVSVersionChecker(object):
conntrack_supported = None
@classmethod
def supports_ovsfirewall(cls):
if cls.conntrack_supported is None:
cls.conntrack_supported = checks.ovs_conntrack_supported()
return cls.conntrack_supported
class BaseSecurityGroupsSameNetworkTest(base.BaseFullStackTestCase):
of_interface = None
ovsdb_interface = None
def setUp(self):
if (self.firewall_driver == 'openvswitch' and
not OVSVersionChecker.supports_ovsfirewall()):
self.skipTest("Open vSwitch firewall_driver doesn't work "
"with this version of ovs.")
host_descriptions = [
environment.HostDescription(
of_interface=self.of_interface,
ovsdb_interface=self.ovsdb_interface,
l2_agent_type=self.l2_agent_type,
firewall_driver=self.firewall_driver) for _ in range(2)]
env = environment.Environment(
environment.EnvironmentDescription(
network_type=self.network_type),
host_descriptions)
super(BaseSecurityGroupsSameNetworkTest, self).setUp(env)
def assert_connection(self, *args, **kwargs):
netcat = net_helpers.NetcatTester(*args, **kwargs)
self.addCleanup(netcat.stop_processes)
self.assertTrue(netcat.test_connectivity())
netcat.stop_processes()
def assert_no_connection(self, *args, **kwargs):
netcat = net_helpers.NetcatTester(*args, **kwargs)
self.addCleanup(netcat.stop_processes)
self.assertRaises(RuntimeError, netcat.test_connectivity)
netcat.stop_processes()
class TestSecurityGroupsSameNetwork(BaseSecurityGroupsSameNetworkTest):
l2_agent_type = constants.AGENT_TYPE_OVS
network_type = 'vxlan'
scenarios = [
('hybrid', {'firewall_driver': 'iptables_hybrid',
'of_interface': 'native',
'ovsdb_interface': 'native'}),
('openflow-cli_ovsdb-cli', {'firewall_driver': 'openvswitch',
'of_interface': 'ovs-ofctl',
'ovsdb_interface': 'vsctl'}),
('openflow-native_ovsdb-native', {'firewall_driver': 'openvswitch',
'of_interface': 'native',
'ovsdb_interface': 'native'})]
def test_tcp_securitygroup(self):
"""Tests if a TCP security group rule is working, by confirming
that 1. connection from allowed security group is allowed,
2. connection from elsewhere is blocked,
3. traffic not explicitly allowed (eg. ICMP) is blocked, and
4. a security group update takes effect.
"""
index_to_sg = [0, 0, 1]
if self.firewall_driver == 'iptables_hybrid':
# The iptables_hybrid driver lacks isolation between agents
index_to_host = [0] * 3
else:
index_to_host = [0, 1, 1]
tenant_uuid = uuidutils.generate_uuid()
network = self.safe_client.create_network(tenant_uuid)
self.safe_client.create_subnet(
tenant_uuid, network['id'], '20.0.0.0/24')
sgs = [self.safe_client.create_security_group(tenant_uuid)
for i in range(2)]
ports = [
self.safe_client.create_port(tenant_uuid, network['id'],
self.environment.hosts[host].hostname,
security_groups=[sgs[sg]['id']])
for host, sg in zip(index_to_host, index_to_sg)]
self.safe_client.create_security_group_rule(
tenant_uuid, sgs[0]['id'],
remote_group_id=sgs[0]['id'], direction='ingress',
ethertype=constants.IPv4,
protocol=constants.PROTO_NAME_TCP,
port_range_min=3333, port_range_max=3333)
vms = [
self.useFixture(
machine.FakeFullstackMachine(
self.environment.hosts[host],
network['id'],
tenant_uuid,
self.safe_client,
neutron_port=ports[port]))
for port, host in enumerate(index_to_host)]
for vm in vms:
vm.block_until_boot()
# 1. check if connection from allowed security group is allowed
self.assert_connection(
vms[1].namespace, vms[0].namespace, vms[0].ip, 3333,
net_helpers.NetcatTester.TCP)
# 2. check if connection from elsewhere is blocked
self.assert_no_connection(
vms[2].namespace, vms[0].namespace, vms[0].ip, 3333,
net_helpers.NetcatTester.TCP)
# 3. check if traffic not explicitly allowed (eg. ICMP) is blocked
net_helpers.assert_no_ping(vms[0].namespace, vms[1].ip)
net_helpers.assert_no_ping(vms[0].namespace, vms[2].ip)
net_helpers.assert_no_ping(vms[1].namespace, vms[2].ip)
# 4. check if a security group update takes effect
self.assert_no_connection(
vms[1].namespace, vms[0].namespace, vms[0].ip, 3344,
net_helpers.NetcatTester.TCP)
self.safe_client.create_security_group_rule(
tenant_uuid, sgs[0]['id'],
remote_group_id=sgs[0]['id'], direction='ingress',
ethertype=constants.IPv4,
protocol=constants.PROTO_NAME_TCP,
port_range_min=3344, port_range_max=3344)
self.assert_connection(
vms[1].namespace, vms[0].namespace, vms[0].ip, 3344,
net_helpers.NetcatTester.TCP)