[Stateless SG] Ensure replies won't work without ingress rule

This patch extends existing
test_connectivity_between_vms_using_different_sec_groups
in the way that for stateless SG rules it makes sure that replies from
server to client will not work without explicity add SG rule which will
allow ingress traffic on the ephemeral ports' range.

Change-Id: I940678c15b65552634096ea3e72888b5bf830912
This commit is contained in:
Slawek Kaplonski 2023-03-10 11:31:34 +01:00
parent b327a6342f
commit 365373910c

View File

@ -26,6 +26,7 @@ from tempest.lib import decorators
from neutron_tempest_plugin.common import ssh from neutron_tempest_plugin.common import ssh
from neutron_tempest_plugin.common import utils from neutron_tempest_plugin.common import utils
from neutron_tempest_plugin import config from neutron_tempest_plugin import config
from neutron_tempest_plugin import exceptions
from neutron_tempest_plugin.scenario import base from neutron_tempest_plugin.scenario import base
from neutron_tempest_plugin.scenario import constants as const from neutron_tempest_plugin.scenario import constants as const
@ -166,15 +167,6 @@ class BaseNetworkSecGroupTest(base.BaseTempestTestCase):
if self.stateless_sg: if self.stateless_sg:
self.create_ingress_metadata_secgroup_rule( self.create_ingress_metadata_secgroup_rule(
secgroup_id=sg['id']) secgroup_id=sg['id'])
if sg_name == 'client':
# NOTE(slaweq): In case of stateless SG we need also SG
# rule to accept response from server to client,
self.create_security_group_rule(
security_group_id=sg['id'],
protocol=constants.PROTO_NAME_TCP,
direction=constants.INGRESS_DIRECTION,
port_range_min=EPHEMERAL_PORT_RANGE['min'],
port_range_max=EPHEMERAL_PORT_RANGE['max'])
security_groups[sg_name] = sg security_groups[sg_name] = sg
# NOTE(slaweq): we need to iterate over create_vm_testing_sec_grp as # NOTE(slaweq): we need to iterate over create_vm_testing_sec_grp as
@ -202,17 +194,35 @@ class BaseNetworkSecGroupTest(base.BaseTempestTestCase):
def _message_received(server_ssh_client, client_ssh_client, def _message_received(server_ssh_client, client_ssh_client,
dest_fip, servers): dest_fip, servers):
expected_msg = "Test_msg" expected_msg = "Test_msg"
utils.kill_nc_process(server_ssh_client)
self.nc_listen(server_ssh_client, self.nc_listen(server_ssh_client,
TEST_TCP_PORT, TEST_TCP_PORT,
constants.PROTO_NAME_TCP, constants.PROTO_NAME_TCP,
expected_msg, expected_msg,
list(servers.values())) list(servers.values()))
try:
received_msg = self.nc_client( received_msg = self.nc_client(
dest_fip, dest_fip,
TEST_TCP_PORT, TEST_TCP_PORT,
constants.PROTO_NAME_TCP, constants.PROTO_NAME_TCP,
ssh_client=client_ssh_client) ssh_client=client_ssh_client)
return expected_msg in received_msg return received_msg and expected_msg in received_msg
except exceptions.ShellCommandFailed:
return False
if self.stateless_sg:
# In case of stateless SG connectivity will not work without
# explicit allow ingress response from server to client
utils.wait_until_true(
lambda: not _message_received(
ssh_clients['server'], ssh_clients['client'],
fips['server']['fixed_ip_address'], servers))
self.create_security_group_rule(
security_group_id=security_groups['client']['id'],
protocol=constants.PROTO_NAME_TCP,
direction=constants.INGRESS_DIRECTION,
port_range_min=EPHEMERAL_PORT_RANGE['min'],
port_range_max=EPHEMERAL_PORT_RANGE['max'])
utils.wait_until_true( utils.wait_until_true(
lambda: _message_received( lambda: _message_received(