fullstack: add some more security group tests

The added tests check if a remote SG member addition works and
a SG rule removal stops an established connection.  These tests
piggyback on the existing test method in order to work around
the lack of isolation between tests.

Change-Id: I43bae9b320528ecdf3ec60881613ab1028e90144
This commit is contained in:
IWAMOTO Toshihiro 2016-08-01 06:49:11 +00:00
parent e3258fcd64
commit 88ee81dfaa
2 changed files with 59 additions and 2 deletions

View File

@ -498,6 +498,12 @@ class NetcatTester(object):
return message == testing_string
def test_no_connectivity(self, respawn=False):
try:
return not self.test_connectivity(respawn)
except RuntimeError:
return True
def _spawn_nc_in_namespace(self, namespace, address, listen=False):
cmd = ['nc', address, self.dst_port]
if self.protocol == self.UDP:

View File

@ -16,6 +16,7 @@ from neutron_lib import constants
from oslo_utils import uuidutils
from neutron.cmd.sanity import checks
from neutron.common import utils as common_utils
from neutron.tests.common import net_helpers
from neutron.tests.fullstack import base
from neutron.tests.fullstack.resources import environment
@ -86,12 +87,17 @@ class TestSecurityGroupsSameNetwork(BaseSecurityGroupsSameNetworkTest):
'of_interface': 'native',
'ovsdb_interface': 'native'})]
# NOTE(toshii): As a firewall_driver can interfere with others,
# the recommended way to add test is to expand this method, not
# adding another.
def test_tcp_securitygroup(self):
"""Tests if a TCP security group rule is working, by confirming
that 1. connection from allowed security group is allowed,
2. connection from elsewhere is blocked,
3. traffic not explicitly allowed (eg. ICMP) is blocked, and
4. a security group update takes effect.
3. traffic not explicitly allowed (eg. ICMP) is blocked,
4. a security group update takes effect,
5. a remote security group member addition works, and
6. an established connection stops by deleting a SG rule.
"""
index_to_sg = [0, 0, 1]
if self.firewall_driver == 'iptables_hybrid':
@ -164,3 +170,48 @@ class TestSecurityGroupsSameNetwork(BaseSecurityGroupsSameNetworkTest):
self.assert_connection(
vms[1].namespace, vms[0].namespace, vms[0].ip, 3344,
net_helpers.NetcatTester.TCP)
# 5. check if a remote security group member addition works
rule2 = self.safe_client.create_security_group_rule(
tenant_uuid, sgs[0]['id'],
remote_group_id=sgs[1]['id'], direction='ingress',
ethertype=constants.IPv4,
protocol=constants.PROTO_NAME_TCP,
port_range_min=3355, port_range_max=3355)
self.assert_connection(
vms[2].namespace, vms[0].namespace, vms[0].ip, 3355,
net_helpers.NetcatTester.TCP)
index_to_host.append(index_to_host[2])
index_to_sg.append(1)
ports.append(
self.safe_client.create_port(tenant_uuid, network['id'],
self.environment.hosts[
index_to_host[3]].hostname,
security_groups=[sgs[1]['id']]))
vms.append(
self.useFixture(
machine.FakeFullstackMachine(
self.environment.hosts[index_to_host[3]],
network['id'],
tenant_uuid,
self.safe_client,
neutron_port=ports[3])))
vms[3].block_until_boot()
netcat = net_helpers.NetcatTester(vms[3].namespace,
vms[0].namespace, vms[0].ip, 3355,
net_helpers.NetcatTester.TCP)
self.addCleanup(netcat.stop_processes)
self.assertTrue(netcat.test_connectivity())
# 6. check if an established connection stops by deleting
# the supporting SG rule.
self.client.delete_security_group_rule(rule2['id'])
common_utils.wait_until_true(lambda: netcat.test_no_connectivity(),
sleep=8)
netcat.stop_processes()