Merge "Run security group scenario tests against stateless security groups too"

This commit is contained in:
Zuul 2023-01-11 17:29:27 +00:00 committed by Gerrit Code Review
commit ad03ebf2da
5 changed files with 396 additions and 168 deletions

View File

@ -994,8 +994,9 @@ class BaseNetworkTest(test.BaseTestCase):
ip_version = ip_version or cls._ip_version
default_params = (
constants.DEFAULT_SECURITY_GROUP_RULE_PARAMS[ip_version])
if ('remote_address_group_id' in kwargs and 'remote_ip_prefix' in
default_params):
if (('remote_address_group_id' in kwargs or
'remote_group_id' in kwargs) and
'remote_ip_prefix' in default_params):
default_params.pop('remote_ip_prefix')
for key, value in default_params.items():
kwargs.setdefault(key, value)

View File

@ -189,6 +189,27 @@ class BaseTempestTestCase(base_api.BaseNetworkTest):
port_range_min=22,
port_range_max=22)
@classmethod
def create_ingress_metadata_secgroup_rule(cls, secgroup_id=None):
"""This rule is intended to permit inbound metadata traffic
Allowing ingress traffic from metadata server, required only for
stateless security groups.
"""
if getattr(cls, 'stateless_sg'):
# NOTE(slaweq): in case of stateless security groups, there is no
# "related" or "established" traffic matching at all so even if
# egress traffic to 169.254.169.254 is allowed by default SG, we
# need to explicitly allow ingress traffic from the metadata server
# to be able to receive responses in the guest vm
cls.create_security_group_rule(
security_group_id=secgroup_id,
direction=neutron_lib_constants.INGRESS_DIRECTION,
protocol=neutron_lib_constants.PROTO_NAME_TCP,
remote_ip_prefix='169.254.169.254/32',
description='metadata out'
)
@classmethod
def create_pingable_secgroup_rule(cls, secgroup_id=None,
client=None):

View File

@ -32,7 +32,7 @@ from neutron_tempest_plugin.scenario import constants as const
CONF = config.CONF
class NetworkSecGroupTest(base.BaseTempestTestCase):
class BaseNetworkSecGroupTest(base.BaseTempestTestCase):
credentials = ['primary', 'admin']
required_extensions = ['router', 'security-group']
@ -70,17 +70,17 @@ class NetworkSecGroupTest(base.BaseTempestTestCase):
@classmethod
def setup_credentials(cls):
super(NetworkSecGroupTest, cls).setup_credentials()
super(BaseNetworkSecGroupTest, cls).setup_credentials()
cls.network_client = cls.os_admin.network_client
@classmethod
def setup_clients(cls):
super(NetworkSecGroupTest, cls).setup_clients()
super(BaseNetworkSecGroupTest, cls).setup_clients()
cls.project_id = cls.os_primary.credentials.tenant_id
@classmethod
def resource_setup(cls):
super(NetworkSecGroupTest, cls).resource_setup()
super(BaseNetworkSecGroupTest, cls).resource_setup()
# setup basic topology for servers we can log into it
cls.network = cls.create_network()
cls.subnet = cls.create_subnet(cls.network)
@ -89,7 +89,7 @@ class NetworkSecGroupTest(base.BaseTempestTestCase):
cls.keypair = cls.create_keypair()
def setUp(self):
super(NetworkSecGroupTest, self).setUp()
super(BaseNetworkSecGroupTest, self).setUp()
self.addCleanup(test_utils.call_and_ignore_notfound_exc,
self.network_client.reset_quotas, self.project_id)
self.network_client.update_quotas(self.project_id, security_group=-1)
@ -132,12 +132,19 @@ class NetworkSecGroupTest(base.BaseTempestTestCase):
# Add specific remote prefix to VMs and check connectivity
ssh_secgrp_name = data_utils.rand_name('ssh_secgrp')
icmp_secgrp_name = data_utils.rand_name('icmp_secgrp_with_cidr')
sg_kwargs = {}
if self.stateless_sg:
sg_kwargs['stateful'] = False
ssh_secgrp = self.os_primary.network_client.create_security_group(
name=ssh_secgrp_name)
name=ssh_secgrp_name,
**sg_kwargs)
self.create_loginable_secgroup_rule(
secgroup_id=ssh_secgrp['security_group']['id'])
self.create_ingress_metadata_secgroup_rule(
secgroup_id=ssh_secgrp['security_group']['id'])
icmp_secgrp = self.os_primary.network_client.create_security_group(
name=icmp_secgrp_name)
name=icmp_secgrp_name,
**sg_kwargs)
self.create_secgroup_rules(
rule_list, secgroup_id=icmp_secgrp['security_group']['id'])
for sec_grp in (ssh_secgrp, icmp_secgrp):
@ -157,8 +164,7 @@ class NetworkSecGroupTest(base.BaseTempestTestCase):
'fixed_ip_address'], should_succeed=should_succeed,
servers=servers)
@decorators.idempotent_id('3d73ec1a-2ec6-45a9-b0f8-04a283d9d764')
def test_default_sec_grp_scenarios(self):
def _test_default_sec_grp_scenarios(self):
server_ssh_clients, fips, servers = self.create_vm_testing_sec_grp()
# Check ssh connectivity when you add sec group rule, enabling ssh
self.create_loginable_secgroup_rule(
@ -191,11 +197,15 @@ class NetworkSecGroupTest(base.BaseTempestTestCase):
self.check_remote_connectivity(server_ssh_clients[0], ext_net_ip,
servers=servers)
@decorators.idempotent_id('3d73ec1a-2ec6-45a9-b0f8-04a283d9d864')
def test_protocol_number_rule(self):
def _test_protocol_number_rule(self):
# protocol number is added instead of str in security rule creation
name = data_utils.rand_name("test_protocol_number_rule")
security_group = self.create_security_group(name=name)
sg_kwargs = {
'name': name
}
if self.stateless_sg:
sg_kwargs['stateful'] = False
security_group = self.create_security_group(**sg_kwargs)
port = self.create_port(network=self.network, name=name,
security_groups=[security_group['id']])
_, fips, _ = self.create_vm_testing_sec_grp(num_servers=1,
@ -208,19 +218,25 @@ class NetworkSecGroupTest(base.BaseTempestTestCase):
self.create_secgroup_rules(rule_list, secgroup_id=security_group['id'])
self.ping_ip_address(fips[0]['floating_ip_address'])
@decorators.idempotent_id('3d73ec1a-2ec6-45a9-b0f8-04a283d9d964')
def test_two_sec_groups(self):
def _test_two_sec_groups(self):
# add 2 sec groups to VM and test rules of both are working
ssh_secgrp_name = data_utils.rand_name('ssh_secgrp')
icmp_secgrp_name = data_utils.rand_name('icmp_secgrp')
sg_kwargs = {}
if self.stateless_sg:
sg_kwargs['stateful'] = False
ssh_secgrp = self.os_primary.network_client.create_security_group(
name=ssh_secgrp_name)
name=ssh_secgrp_name,
**sg_kwargs)
self.create_loginable_secgroup_rule(
secgroup_id=ssh_secgrp['security_group']['id'])
icmp_secgrp = self.os_primary.network_client.create_security_group(
name=icmp_secgrp_name)
name=icmp_secgrp_name,
**sg_kwargs)
self.create_pingable_secgroup_rule(
secgroup_id=icmp_secgrp['security_group']['id'])
self.create_ingress_metadata_secgroup_rule(
secgroup_id=ssh_secgrp['security_group']['id'])
for sec_grp in (ssh_secgrp, icmp_secgrp):
self.security_groups.append(sec_grp['security_group'])
security_groups_list = [{'name': ssh_secgrp_name},
@ -264,79 +280,22 @@ class NetworkSecGroupTest(base.BaseTempestTestCase):
# make sure ICMP connectivity works after update
self.ping_ip_address(fips[0]['floating_ip_address'])
@decorators.idempotent_id('3d73ec1a-2ec6-45a9-b0f8-04a283d9d664')
def test_ip_prefix(self):
cidr = self.subnet['cidr']
rule_list = [{'protocol': constants.PROTO_NUM_ICMP,
'direction': constants.INGRESS_DIRECTION,
'remote_ip_prefix': cidr}]
self._test_ip_prefix(rule_list, should_succeed=True)
@decorators.attr(type='negative')
@decorators.idempotent_id('a01cd2ef-3cfc-4614-8aac-9d1333ea21dd')
def test_ip_prefix_negative(self):
# define bad CIDR
cidr = '10.100.0.254/32'
rule_list = [{'protocol': constants.PROTO_NUM_ICMP,
'direction': constants.INGRESS_DIRECTION,
'remote_ip_prefix': cidr}]
self._test_ip_prefix(rule_list, should_succeed=False)
@decorators.idempotent_id('01f0ddca-b049-47eb-befd-82acb502c9ec')
def test_established_tcp_session_after_re_attachinging_sg(self):
"""Test existing connection remain open after sg has been re-attached
Verifies that new packets can pass over the existing connection when
the security group has been removed from the server and then added
back
"""
ssh_sg = self.create_security_group()
self.create_loginable_secgroup_rule(secgroup_id=ssh_sg['id'])
vm_ssh, fips, vms = self.create_vm_testing_sec_grp(
security_groups=[{'name': ssh_sg['name']}])
sg = self.create_security_group()
nc_rule = [{'protocol': constants.PROTO_NUM_TCP,
'direction': constants.INGRESS_DIRECTION,
'port_range_min': 6666,
'port_range_max': 6666}]
self.create_secgroup_rules(nc_rule, secgroup_id=sg['id'])
srv_port = self.client.list_ports(network_id=self.network['id'],
device_id=vms[1]['server']['id'])['ports'][0]
srv_ip = srv_port['fixed_ips'][0]['ip_address']
with utils.StatefulConnection(
vm_ssh[0], vm_ssh[1], srv_ip, 6666) as con:
self.client.update_port(srv_port['id'],
security_groups=[ssh_sg['id'], sg['id']])
con.test_connection()
with utils.StatefulConnection(
vm_ssh[0], vm_ssh[1], srv_ip, 6666) as con:
self.client.update_port(
srv_port['id'], security_groups=[ssh_sg['id']])
con.test_connection(should_pass=False)
with utils.StatefulConnection(
vm_ssh[0], vm_ssh[1], srv_ip, 6666) as con:
self.client.update_port(srv_port['id'],
security_groups=[ssh_sg['id'], sg['id']])
con.test_connection()
self.client.update_port(srv_port['id'],
security_groups=[ssh_sg['id']])
con.test_connection(should_pass=False)
self.client.update_port(srv_port['id'],
security_groups=[ssh_sg['id'], sg['id']])
con.test_connection()
@decorators.idempotent_id('7ed39b86-006d-40fb-887a-ae46693dabc9')
def test_remote_group(self):
def _test_remote_group(self):
sg_kwargs = {}
if self.stateless_sg:
sg_kwargs['stateful'] = False
# create a new sec group
ssh_secgrp_name = data_utils.rand_name('ssh_secgrp')
ssh_secgrp = self.os_primary.network_client.create_security_group(
name=ssh_secgrp_name)
name=ssh_secgrp_name,
**sg_kwargs)
# add cleanup
self.security_groups.append(ssh_secgrp['security_group'])
# configure sec group to support SSH connectivity
self.create_loginable_secgroup_rule(
secgroup_id=ssh_secgrp['security_group']['id'])
self.create_ingress_metadata_secgroup_rule(
secgroup_id=ssh_secgrp['security_group']['id'])
# spawn two instances with the sec group created
server_ssh_clients, fips, servers = self.create_vm_testing_sec_grp(
security_groups=[{'name': ssh_secgrp_name}])
@ -363,14 +322,7 @@ class NetworkSecGroupTest(base.BaseTempestTestCase):
self.ping_ip_address(fips[0]['floating_ip_address'],
should_succeed=False)
@testtools.skipUnless(
CONF.neutron_plugin_options.firewall_driver == 'openvswitch',
"Openvswitch agent is required to run this test")
@decorators.idempotent_id('678dd4c0-2953-4626-b89c-8e7e4110ec4b')
@tempest_utils.requires_ext(extension="address-group", service="network")
@tempest_utils.requires_ext(
extension="security-groups-remote-address-group", service="network")
def test_remote_group_and_remote_address_group(self):
def _test_remote_group_and_remote_address_group(self):
"""Test SG rules with remote group and remote address group
This test checks the ICMP connection among two servers using a security
@ -379,10 +331,14 @@ class NetworkSecGroupTest(base.BaseTempestTestCase):
is applied. When both rules are applied (overlapped), removing one of
them should not disable the connection.
"""
sg_kwargs = {}
if self.stateless_sg:
sg_kwargs['stateful'] = False
# create a new sec group
ssh_secgrp_name = data_utils.rand_name('ssh_secgrp')
ssh_secgrp = self.os_primary.network_client.create_security_group(
name=ssh_secgrp_name)
name=ssh_secgrp_name,
**sg_kwargs)
# add cleanup
self.security_groups.append(ssh_secgrp['security_group'])
# configure sec group to support SSH connectivity
@ -452,8 +408,7 @@ class NetworkSecGroupTest(base.BaseTempestTestCase):
self.ping_ip_address(fips[0]['floating_ip_address'],
should_succeed=False)
@decorators.idempotent_id('f07d0159-8f9e-4faa-87f5-a869ab0ad488')
def test_multiple_ports_secgroup_inheritance(self):
def _test_multiple_ports_secgroup_inheritance(self):
"""Test multiple port security group inheritance
This test creates two ports with security groups, then
@ -461,12 +416,18 @@ class NetworkSecGroupTest(base.BaseTempestTestCase):
inherited properly and enforced in these instances.
"""
# create a security group and make it loginable and pingable
sg_kwargs = {}
if self.stateless_sg:
sg_kwargs['stateful'] = False
secgrp = self.os_primary.network_client.create_security_group(
name=data_utils.rand_name('secgrp'))
name=data_utils.rand_name('secgrp'),
**sg_kwargs)
self.create_loginable_secgroup_rule(
secgroup_id=secgrp['security_group']['id'])
self.create_pingable_secgroup_rule(
secgroup_id=secgrp['security_group']['id'])
self.create_ingress_metadata_secgroup_rule(
secgroup_id=secgrp['security_group']['id'])
# add security group to cleanup
self.security_groups.append(secgrp['security_group'])
# create two ports with fixed IPs and the security group created
@ -485,17 +446,30 @@ class NetworkSecGroupTest(base.BaseTempestTestCase):
CONF.validation.image_ssh_user,
self.keypair['private_key'])
@decorators.idempotent_id('f07d0159-8f9e-4faa-87f5-a869ab0ad489')
def test_multiple_ports_portrange_remote(self):
def _test_multiple_ports_portrange_remote(self):
sg_kwargs = {}
initial_security_groups = []
if self.stateless_sg:
sg_kwargs['stateful'] = False
md_secgrp = self.os_primary.network_client.create_security_group(
name=data_utils.rand_name('metadata_secgrp'),
**sg_kwargs)
self.create_ingress_metadata_secgroup_rule(
secgroup_id=md_secgrp['security_group']['id'])
initial_security_groups.append(
{'name': md_secgrp['security_group']['name']})
ssh_clients, fips, servers = self.create_vm_testing_sec_grp(
num_servers=3)
num_servers=3, security_groups=initial_security_groups)
secgroups = []
ports = []
# Create remote and test security groups
for i in range(0, 2):
secgroups.append(
self.create_security_group(name='secgrp-%d' % i))
self.create_security_group(
name='secgrp-%d' % i,
**sg_kwargs))
# configure sec groups to support SSH connectivity
self.create_loginable_secgroup_rule(
secgroup_id=secgroups[-1]['id'])
@ -535,6 +509,21 @@ class NetworkSecGroupTest(base.BaseTempestTestCase):
'port_range_min': '82',
'port_range_max': '83',
'remote_group_id': secgroups[0]['id']}]
if self.stateless_sg:
rule_list.append({
'protocol': constants.PROTO_NUM_TCP,
'direction': constants.EGRESS_DIRECTION,
'remote_group_id': secgroups[0]['id']})
# NOTE(slaweq): in case of stateless SG, client needs to have also
# rule which will explicitly accept ingress connections from
# secgroup[1]
self.create_security_group_rule(
security_group_id=secgroups[0]['id'],
protocol=constants.PROTO_NAME_TCP,
direction=constants.INGRESS_DIRECTION,
remote_group_id=secgroups[1]['id'])
self.create_secgroup_rules(
rule_list, secgroup_id=secgroups[1]['id'])
@ -558,6 +547,170 @@ class NetworkSecGroupTest(base.BaseTempestTestCase):
ssh_clients[0], ssh_clients[2], test_ip, port) as con:
con.test_connection(should_pass=False)
def _test_overlapping_sec_grp_rules(self):
"""Test security group rules with overlapping port ranges"""
sg_kwargs = {}
initial_security_groups = []
if self.stateless_sg:
sg_kwargs['stateful'] = False
md_secgrp = self.os_primary.network_client.create_security_group(
name=data_utils.rand_name('metadata_secgrp'),
**sg_kwargs)
self.create_ingress_metadata_secgroup_rule(
secgroup_id=md_secgrp['security_group']['id'])
initial_security_groups.append(
{'name': md_secgrp['security_group']['name']})
client_ssh, _, vms = self.create_vm_testing_sec_grp(
num_servers=2, security_groups=initial_security_groups)
tmp_ssh, _, tmp_vm = self.create_vm_testing_sec_grp(
num_servers=1, security_groups=initial_security_groups)
srv_ssh = tmp_ssh[0]
srv_vm = tmp_vm[0]
srv_port = self.client.list_ports(network_id=self.network['id'],
device_id=srv_vm['server']['id'])['ports'][0]
srv_ip = srv_port['fixed_ips'][0]['ip_address']
secgrps = []
for i, vm in enumerate(vms):
sg = self.create_security_group(
name='secgrp-%d' % i,
**sg_kwargs)
self.create_loginable_secgroup_rule(secgroup_id=sg['id'])
port = self.client.list_ports(network_id=self.network['id'],
device_id=vm['server']['id'])['ports'][0]
self.client.update_port(port['id'], security_groups=[sg['id']])
secgrps.append(sg)
tcp_port = 3000
rule_list = [{'protocol': constants.PROTO_NUM_TCP,
'direction': constants.INGRESS_DIRECTION,
'port_range_min': tcp_port,
'port_range_max': tcp_port,
'remote_group_id': secgrps[0]['id']},
{'protocol': constants.PROTO_NUM_TCP,
'direction': constants.INGRESS_DIRECTION,
'port_range_min': tcp_port,
'port_range_max': tcp_port + 2,
'remote_group_id': secgrps[1]['id']}]
self.client.update_port(srv_port['id'],
security_groups=[secgrps[0]['id'], secgrps[1]['id']])
self.create_secgroup_rules(rule_list, secgroup_id=secgrps[0]['id'])
if self.stateless_sg:
# NOTE(slaweq): in case of stateless SG, client needs to have also
# rule which will explicitly accept ingress TCP connections which
# will be replies from the TCP server so it will use random
# destination port (depends on the src port choosen by client while
# establishing connection)
self.create_security_group_rule(
security_group_id=secgrps[0]['id'],
protocol=constants.PROTO_NAME_TCP,
direction=constants.INGRESS_DIRECTION)
self.create_security_group_rule(
security_group_id=secgrps[1]['id'],
protocol=constants.PROTO_NAME_TCP,
direction=constants.INGRESS_DIRECTION)
# The conntrack entries are ruled by the OF definitions but conntrack
# status can change the datapath. Let's check the rules in two
# attempts
for _ in range(2):
with utils.StatefulConnection(
client_ssh[0], srv_ssh, srv_ip, tcp_port) as con:
con.test_connection()
for port in range(tcp_port, tcp_port + 3):
with utils.StatefulConnection(
client_ssh[1], srv_ssh, srv_ip, port) as con:
con.test_connection()
def _test_remove_sec_grp_from_active_vm(self):
"""Tests the following:
1. Create SG associated with ICMP rule
2. Create Port (assoiated to SG #1) and use it to create the VM
3. Ping the VM, expected should be PASS
4. Remove the security group from VM by Port update
5. Ping the VM, expected should be FAIL
"""
sec_grp_name = data_utils.rand_name('test_sg')
sg_kwargs = {
'name': sec_grp_name
}
if self.stateless_sg:
sg_kwargs['stateful'] = False
secgrp = self.os_primary.network_client.create_security_group(
**sg_kwargs)
self.security_groups.append(secgrp['security_group'])
sec_grp_id = secgrp['security_group']['id']
self.create_pingable_secgroup_rule(sec_grp_id)
ex_port = self.create_port(
self.network, fixed_ips=[{'subnet_id': self.subnet['id']}],
security_groups=[sec_grp_id])
fip = self.create_vm_testing_sec_grp(
num_servers=1, security_groups=[{'name': sec_grp_name}],
ports=[ex_port])[1][0]
self.ping_ip_address(fip['floating_ip_address'])
self.client.update_port(ex_port['id'],
security_groups=[])
self.ping_ip_address(fip['floating_ip_address'],
should_succeed=False)
class StatefulNetworkSecGroupTest(BaseNetworkSecGroupTest):
stateless_sg = False
@decorators.idempotent_id('3d73ec1a-2ec6-45a9-b0f8-04a283d9d764')
def test_default_sec_grp_scenarios(self):
self._test_default_sec_grp_scenarios()
@decorators.idempotent_id('3d73ec1a-2ec6-45a9-b0f8-04a283d9d864')
def test_protocol_number_rule(self):
self._test_protocol_number_rule()
@decorators.idempotent_id('3d73ec1a-2ec6-45a9-b0f8-04a283d9d964')
def test_two_sec_groups(self):
self._test_two_sec_groups()
@decorators.idempotent_id('3d73ec1a-2ec6-45a9-b0f8-04a283d9d664')
def test_ip_prefix(self):
cidr = self.subnet['cidr']
rule_list = [{'protocol': constants.PROTO_NUM_ICMP,
'direction': constants.INGRESS_DIRECTION,
'remote_ip_prefix': cidr}]
self._test_ip_prefix(rule_list, should_succeed=True)
@decorators.attr(type='negative')
@decorators.idempotent_id('a01cd2ef-3cfc-4614-8aac-9d1333ea21dd')
def test_ip_prefix_negative(self):
# define bad CIDR
cidr = '10.100.0.254/32'
rule_list = [{'protocol': constants.PROTO_NUM_ICMP,
'direction': constants.INGRESS_DIRECTION,
'remote_ip_prefix': cidr}]
self._test_ip_prefix(rule_list, should_succeed=False)
@decorators.idempotent_id('7ed39b86-006d-40fb-887a-ae46693dabc9')
def test_remote_group(self):
self._test_remote_group()
@testtools.skipUnless(
CONF.neutron_plugin_options.firewall_driver == 'openvswitch',
"Openvswitch agent is required to run this test")
@decorators.idempotent_id('678dd4c0-2953-4626-b89c-8e7e4110ec4b')
@tempest_utils.requires_ext(extension="address-group", service="network")
@tempest_utils.requires_ext(
extension="security-groups-remote-address-group", service="network")
def test_remote_group_and_remote_address_group(self):
self._test_remote_group_and_remote_address_group()
@decorators.idempotent_id('f07d0159-8f9e-4faa-87f5-a869ab0ad488')
def test_multiple_ports_secgroup_inheritance(self):
self._test_multiple_ports_secgroup_inheritance()
@decorators.idempotent_id('f07d0159-8f9e-4faa-87f5-a869ab0ad489')
def test_multiple_ports_portrange_remote(self):
self._test_multiple_ports_portrange_remote()
@decorators.idempotent_id('f07d0159-8f9e-4faa-87f5-a869ab0ad490')
def test_intra_sg_isolation(self):
"""Test intra security group isolation
@ -570,8 +723,11 @@ class NetworkSecGroupTest(base.BaseTempestTestCase):
"""
# create a security group and make it loginable
secgrp_name = data_utils.rand_name('secgrp')
sg_kwargs = {
'name': secgrp_name
}
secgrp = self.os_primary.network_client.create_security_group(
name=secgrp_name)
**sg_kwargs)
secgrp_id = secgrp['security_group']['id']
# add security group to cleanup
self.security_groups.append(secgrp['security_group'])
@ -633,74 +789,120 @@ class NetworkSecGroupTest(base.BaseTempestTestCase):
@decorators.idempotent_id('cd66b826-d86c-4fb4-ab37-17c8391753cb')
def test_overlapping_sec_grp_rules(self):
"""Test security group rules with overlapping port ranges"""
client_ssh, _, vms = self.create_vm_testing_sec_grp(num_servers=2)
tmp_ssh, _, tmp_vm = self.create_vm_testing_sec_grp(num_servers=1)
srv_ssh = tmp_ssh[0]
srv_vm = tmp_vm[0]
srv_port = self.client.list_ports(network_id=self.network['id'],
device_id=srv_vm['server']['id'])['ports'][0]
srv_ip = srv_port['fixed_ips'][0]['ip_address']
secgrps = []
for i, vm in enumerate(vms):
sg = self.create_security_group(name='secgrp-%d' % i)
self.create_loginable_secgroup_rule(secgroup_id=sg['id'])
port = self.client.list_ports(network_id=self.network['id'],
device_id=vm['server']['id'])['ports'][0]
self.client.update_port(port['id'], security_groups=[sg['id']])
secgrps.append(sg)
tcp_port = 3000
rule_list = [{'protocol': constants.PROTO_NUM_TCP,
'direction': constants.INGRESS_DIRECTION,
'port_range_min': tcp_port,
'port_range_max': tcp_port,
'remote_group_id': secgrps[0]['id']},
{'protocol': constants.PROTO_NUM_TCP,
'direction': constants.INGRESS_DIRECTION,
'port_range_min': tcp_port,
'port_range_max': tcp_port + 2,
'remote_group_id': secgrps[1]['id']}]
self.client.update_port(srv_port['id'],
security_groups=[secgrps[0]['id'], secgrps[1]['id']])
self.create_secgroup_rules(rule_list, secgroup_id=secgrps[0]['id'])
# The conntrack entries are ruled by the OF definitions but conntrack
# status can change the datapath. Let's check the rules in two
# attempts
for _ in range(2):
with utils.StatefulConnection(
client_ssh[0], srv_ssh, srv_ip, tcp_port) as con:
con.test_connection()
for port in range(tcp_port, tcp_port + 3):
with utils.StatefulConnection(
client_ssh[1], srv_ssh, srv_ip, port) as con:
con.test_connection()
self._test_overlapping_sec_grp_rules()
@decorators.idempotent_id('96dcd5ff-9d45-4e0d-bea0-0b438cbd388f')
def test_remove_sec_grp_from_active_vm(self):
"""Tests the following:
self._test_remove_sec_grp_from_active_vm()
1. Create SG associated with ICMP rule
2. Create Port (assoiated to SG #1) and use it to create the VM
3. Ping the VM, expected should be PASS
4. Remove the security group from VM by Port update
5. Ping the VM, expected should be FAIL
@decorators.idempotent_id('01f0ddca-b049-47eb-befd-82acb502c9ec')
def test_established_tcp_session_after_re_attachinging_sg(self):
"""Test existing connection remain open after sg has been re-attached
Verifies that new packets can pass over the existing connection when
the security group has been removed from the server and then added
back
"""
sec_grp_name = data_utils.rand_name('test_sg')
secgrp = self.os_primary.network_client.create_security_group(
name=sec_grp_name)
self.security_groups.append(secgrp['security_group'])
sec_grp_id = secgrp['security_group']['id']
self.create_pingable_secgroup_rule(sec_grp_id)
ex_port = self.create_port(
self.network, fixed_ips=[{'subnet_id': self.subnet['id']}],
security_groups=[sec_grp_id])
fip = self.create_vm_testing_sec_grp(
num_servers=1, security_groups=[{'name': sec_grp_name}],
ports=[ex_port])[1][0]
sg_kwargs = {}
if self.stateless_sg:
sg_kwargs['stateful'] = False
ssh_sg = self.create_security_group(**sg_kwargs)
self.create_loginable_secgroup_rule(secgroup_id=ssh_sg['id'])
vm_ssh, fips, vms = self.create_vm_testing_sec_grp(
security_groups=[{'name': ssh_sg['name']}])
sg = self.create_security_group(**sg_kwargs)
nc_rule = [{'protocol': constants.PROTO_NUM_TCP,
'direction': constants.INGRESS_DIRECTION,
'port_range_min': 6666,
'port_range_max': 6666}]
self.create_secgroup_rules(nc_rule, secgroup_id=sg['id'])
srv_port = self.client.list_ports(network_id=self.network['id'],
device_id=vms[1]['server']['id'])['ports'][0]
srv_ip = srv_port['fixed_ips'][0]['ip_address']
with utils.StatefulConnection(
vm_ssh[0], vm_ssh[1], srv_ip, 6666) as con:
self.client.update_port(srv_port['id'],
security_groups=[ssh_sg['id'], sg['id']])
con.test_connection()
with utils.StatefulConnection(
vm_ssh[0], vm_ssh[1], srv_ip, 6666) as con:
self.client.update_port(
srv_port['id'], security_groups=[ssh_sg['id']])
con.test_connection(should_pass=False)
with utils.StatefulConnection(
vm_ssh[0], vm_ssh[1], srv_ip, 6666) as con:
self.client.update_port(srv_port['id'],
security_groups=[ssh_sg['id'], sg['id']])
con.test_connection()
self.client.update_port(srv_port['id'],
security_groups=[ssh_sg['id']])
con.test_connection(should_pass=False)
self.client.update_port(srv_port['id'],
security_groups=[ssh_sg['id'], sg['id']])
con.test_connection()
self.ping_ip_address(fip['floating_ip_address'])
self.client.update_port(ex_port['id'],
security_groups=[])
self.ping_ip_address(fip['floating_ip_address'],
should_succeed=False)
class StatelessNetworkSecGroupTest(BaseNetworkSecGroupTest):
required_extensions = ['security-group', 'stateful-security-group']
stateless_sg = True
@decorators.idempotent_id('9e193e3f-56f2-4f4e-886c-988a147958ef')
def test_default_sec_grp_scenarios(self):
self._test_default_sec_grp_scenarios()
@decorators.idempotent_id('afae8654-a389-4887-b21d-7f07ec350177')
def test_protocol_number_rule(self):
self._test_protocol_number_rule()
@decorators.idempotent_id('b51cc0eb-8f9a-49e7-96ab-61cd31243b67')
def test_two_sec_groups(self):
self._test_two_sec_groups()
@decorators.idempotent_id('07985496-58da-4c1f-a6ef-2fdd88128a81')
def test_ip_prefix(self):
cidr = self.subnet['cidr']
rule_list = [{'protocol': constants.PROTO_NUM_ICMP,
'direction': constants.INGRESS_DIRECTION,
'remote_ip_prefix': cidr}]
self._test_ip_prefix(rule_list, should_succeed=True)
@decorators.attr(type='negative')
@decorators.idempotent_id('1ad469c4-0d8f-42ae-8ec3-46cc424565c4')
def test_ip_prefix_negative(self):
# define bad CIDR
cidr = '10.100.0.254/32'
rule_list = [{'protocol': constants.PROTO_NUM_ICMP,
'direction': constants.INGRESS_DIRECTION,
'remote_ip_prefix': cidr}]
self._test_ip_prefix(rule_list, should_succeed=False)
@decorators.idempotent_id('fa1e93bf-67c5-4590-9962-38ee1f43a46a')
def test_remote_group(self):
self._test_remote_group()
@testtools.skipUnless(
CONF.neutron_plugin_options.firewall_driver == 'openvswitch',
"Openvswitch agent is required to run this test")
@decorators.idempotent_id('9fae530d-2711-4c61-a4a5-8efe6e58ab14')
@tempest_utils.requires_ext(extension="address-group", service="network")
@tempest_utils.requires_ext(
extension="security-groups-remote-address-group", service="network")
def test_remote_group_and_remote_address_group(self):
self._test_remote_group_and_remote_address_group()
@decorators.idempotent_id('4f1eb6db-ae7f-4f26-b371-cbd8363f9b0b')
def test_multiple_ports_secgroup_inheritance(self):
self._test_multiple_ports_secgroup_inheritance()
@decorators.idempotent_id('4043ca0a-eabb-4198-be53-3d3051cc0804')
def test_multiple_ports_portrange_remote(self):
self._test_multiple_ports_portrange_remote()
@decorators.idempotent_id('bfe25138-ceac-4944-849a-b9b90aff100f')
def test_overlapping_sec_grp_rules(self):
self._test_overlapping_sec_grp_rules()
@decorators.idempotent_id('e4340e47-39cd-49ed-967c-fc2c40b47c5a')
def test_remove_sec_grp_from_active_vm(self):
self._test_remove_sec_grp_from_active_vm()

View File

@ -338,7 +338,8 @@
# bug https://bugzilla.redhat.com/show_bug.cgi?id=1965036 will be fixed
tempest_exclude_regex: "\
(^neutron_tempest_plugin.scenario.test_trunk.TrunkTest.test_subport_connectivity)|\
(^neutron_tempest_plugin.scenario.test_security_groups.NetworkSecGroupTest.test_established_tcp_session_after_re_attachinging_sg)"
(^neutron_tempest_plugin.scenario.test_security_groups.StatefulNetworkSecGroupTest.test_established_tcp_session_after_re_attachinging_sg)|\
(^neutron_tempest_plugin.scenario.test_security_groups.StatelessNetworkSecGroupTest.test_established_tcp_session_after_re_attachinging_sg)"
devstack_localrc:
Q_AGENT: openvswitch
Q_ML2_TENANT_NETWORK_TYPE: vxlan
@ -487,7 +488,8 @@
# fixed
tempest_exclude_regex: "\
(^neutron_tempest_plugin.scenario.test_vlan_transparency.VlanTransparencyTest)|\
(^neutron_tempest_plugin.scenario.test_security_groups.NetworkSecGroupTest.test_established_tcp_session_after_re_attachinging_sg)|\
(^neutron_tempest_plugin.scenario.test_security_groups.StatefulNetworkSecGroupTest.test_established_tcp_session_after_re_attachinging_sg)|\
(^neutron_tempest_plugin.scenario.test_security_groups.StatelessNetworkSecGroupTest.test_established_tcp_session_after_re_attachinging_sg)|\
(^neutron_tempest_plugin.scenario.test_floatingip.FloatingIPPortDetailsTest.test_floatingip_port_details)"
devstack_localrc:
Q_AGENT: linuxbridge

View File

@ -122,7 +122,8 @@
# bug https://bugzilla.redhat.com/show_bug.cgi?id=1965036 will be fixed
tempest_exclude_regex: "\
(^neutron_tempest_plugin.scenario.test_trunk.TrunkTest.test_subport_connectivity)|\
(^neutron_tempest_plugin.scenario.test_security_groups.NetworkSecGroupTest.test_established_tcp_session_after_re_attachinging_sg)"
(^neutron_tempest_plugin.scenario.test_security_groups.StatefulNetworkSecGroupTest.test_established_tcp_session_after_re_attachinging_sg)|\
(^neutron_tempest_plugin.scenario.test_security_groups.StatelessNetworkSecGroupTest.test_established_tcp_session_after_re_attachinging_sg)"
network_available_features: *available_features
devstack_localrc:
NETWORK_API_EXTENSIONS: "{{ (network_api_extensions_common + network_api_extensions_openvswitch) | join(',') }}"
@ -146,7 +147,8 @@
(^tempest.api.compute.servers.test_multiple_create)"
tempest_exclude_regex: "\
(^neutron_tempest_plugin.scenario.test_vlan_transparency.VlanTransparencyTest)|\
(^neutron_tempest_plugin.scenario.test_security_groups.NetworkSecGroupTest.test_established_tcp_session_after_re_attachinging_sg)|\
(^neutron_tempest_plugin.scenario.test_security_groups.StatefulNetworkSecGroupTest.test_established_tcp_session_after_re_attachinging_sg)|\
(^neutron_tempest_plugin.scenario.test_security_groups.StatelessNetworkSecGroupTest.test_established_tcp_session_after_re_attachinging_sg)|\
(^neutron_tempest_plugin.scenario.test_floatingip.FloatingIPPortDetailsTest.test_floatingip_port_details)"
network_available_features: *available_features
devstack_localrc: