Merge "[Stateless SG] Add test to check there's no SG entries in conntrack"

This commit is contained in:
Zuul 2023-03-22 17:09:31 +00:00 committed by Gerrit Code Review
commit daff0d1ef4
6 changed files with 168 additions and 16 deletions

View File

@ -27,6 +27,7 @@ from tobiko.openstack.neutron import _subnet
SERVER = 'neutron-server'
METADATA_IPv4 = '169.254.169.254'
DHCP_AGENT = _agent.DHCP_AGENT
L3_AGENT = _agent.L3_AGENT
METADATA_AGENT = _agent.METADATA_AGENT
@ -39,7 +40,10 @@ STATEFUL_OVN_ACTION = _security_group.STATEFUL_OVN_ACTION
STATELESS_OVN_ACTION = _security_group.STATELESS_OVN_ACTION
AgentNotFoundOnHost = _agent.AgentNotFoundOnHost
NotFound = _client.NotFound
NeutronAgentType = _agent.NeutronAgentType
SecurityGroupType = _security_group.SecurityGroupType
SecurityGroupIdOrNameType = _security_group.SecurityGroupIdOrNameType
get_l3_agent_mode = _agent.get_l3_agent_mode
find_l3_agent_hosting_router = _agent.find_l3_agent_hosting_router
list_agents = _agent.list_agents
@ -137,8 +141,9 @@ SubnetIdType = _subnet.SubnetIdType
NoSuchSubnet = _subnet.NoSuchSubnet
list_security_groups = _security_group.list_security_groups
get_security_group_by_id = _security_group.get_security_group_by_id
get_security_group = _security_group.get_security_group
get_default_security_group = _security_group.get_default_security_group
create_security_group = _security_group.create_security_group
update_security_group = _security_group.update_security_group
delete_security_group = _security_group.delete_security_group
create_security_group_rule = _security_group.create_security_group_rule

View File

@ -29,7 +29,7 @@ STATEFUL_OVN_ACTION = "allow-related"
STATELESS_OVN_ACTION = "allow-stateless"
SecurityGroupType = typing.Dict[str, typing.Any]
SecurityGroupIdType = typing.Union[str, SecurityGroupType]
SecurityGroupIdOrNameType = typing.Union[str, SecurityGroupType]
SecurityGroupRuleType = typing.Dict[str, typing.Any]
SecurityGroupRuleIdType = typing.Union[str, SecurityGroupRuleType]
@ -43,12 +43,12 @@ def list_security_groups(client=None, **params) \
return tobiko.Selection[SecurityGroupType](security_groups)
def get_security_group_by_id(sg_id: SecurityGroupIdType,
client: _client.NeutronClientType = None,
**params) \
def get_security_group(sg: SecurityGroupIdOrNameType,
client: _client.NeutronClientType = None,
**params) \
-> SecurityGroupType:
return _client.neutron_client(client).show_security_group(
sg_id, **params
sg, **params
)['security_group']
@ -75,7 +75,7 @@ def create_security_group(client=None, add_cleanup=True,
return sg
def update_security_group(sg_id: SecurityGroupIdType,
def update_security_group(sg_id: SecurityGroupIdOrNameType,
client: _client.NeutronClientType = None,
**params) \
-> SecurityGroupType:
@ -85,7 +85,7 @@ def update_security_group(sg_id: SecurityGroupIdType,
)['security_group']
def delete_security_group(sg_id: SecurityGroupIdType,
def delete_security_group(sg_id: SecurityGroupIdOrNameType,
should_exists: bool = False,
client: _client.NeutronClientType = None):
try:

View File

@ -75,6 +75,7 @@ NetworkWithNetMtuWriteStackFixture = (
RouterInterfaceStackFixture = _neutron.RouterInterfaceStackFixture
RouterStackFixture = _neutron.RouterStackFixture
SecurityGroupsFixture = _neutron.SecurityGroupsFixture
StatelessSecurityGroupFixture = _neutron.StatelessSecurityGroupFixture
get_external_network = _neutron.get_external_network
has_external_network = _neutron.has_external_network
skip_unless_has_external_network = _neutron.skip_unless_has_external_network

View File

@ -489,6 +489,100 @@ class SecurityGroupsFixture(heat.HeatStackFixture):
#: Heat template file
template = _hot.heat_template_file('neutron/security_groups.yaml')
def __init__(self, stateful: bool = True):
self._stateful = stateful
super(SecurityGroupsFixture, self).__init__()
@property
def stateful(self):
return self._stateful
@neutron.skip_if_missing_networking_extensions('stateful-security-group')
class StatelessSecurityGroupFixture(tobiko.SharedFixture):
"""Neutron Stateless Security Group Fixture.
This SG will by default allow SSH and ICMP to the instance and also
ingress traffic from the metadata service as it can't rely on conntrack.
"""
name: typing.Optional[str] = None
description: typing.Optional[str] = ""
rules = [
{
'protocol': 'tcp',
'port_range_min': 22,
'port_range_max': 22,
'direction': 'ingress'
}, {
'protocol': 'icmp',
'direction': 'ingress'
}, {
'protocol': 'tcp',
'remote_ip_prefix': '%s/32' % neutron.METADATA_IPv4,
'direction': 'ingress'
}
]
_security_group: typing.Optional[neutron.SecurityGroupType] = None
def __init__(self,
name: typing.Optional[str] = None,
description: typing.Optional[str] = None,
rules: typing.Optional[list] = None):
self.name = name or self.fixture_name
if description:
self.description = description
if rules:
self.rules = rules
super(StatelessSecurityGroupFixture, self).__init__()
def setup_fixture(self):
if not self.security_group:
self._security_group = neutron.create_security_group(
name=self.name, description=self.description,
add_cleanup=False, stateful=False)
if self.security_group:
for rule in self.rules:
neutron.create_security_group_rule(
self._security_group['id'],
add_cleanup=False,
**rule)
tobiko.addme_to_shared_resource(__name__, self.name)
def cleanup_fixture(self):
n_tests_using_stack = len(tobiko.removeme_from_shared_resource(
__name__, self.name))
if n_tests_using_stack == 0:
self._cleanup_security_group()
else:
LOG.info('Security Group %r not deleted because %d tests '
'are using it still.',
self.name, n_tests_using_stack)
def _cleanup_security_group(self):
sg_id = self.security_group_id
if sg_id:
self._security_group = None
LOG.debug('Deleting Security Group %r (%r)...',
self.name, sg_id)
neutron.delete_security_group(sg_id)
LOG.debug('Security Group %r (%r) deleted.', self.name, sg_id)
@property
def security_group_id(self):
if self.security_group:
return self._security_group['id']
@property
def security_group(self):
if not self._security_group:
try:
self._security_group = neutron.get_security_group(self.name)
except neutron.NotFound:
LOG.debug("Security group %r not found.", self.name)
self._security_group = None
return self._security_group
def list_external_networks(name: str = None) -> \
tobiko.Selection[neutron.NetworkType]:

View File

@ -725,9 +725,8 @@ class MetadataAgentTest(BaseAgentTest):
if is_reachable not in [True, False]:
raise TypeError("'is_reachable' parameter is not a bool: "
f"{is_reachable!r}")
# TODO: fix hard coded IP address
metadata_url = (metadata_url or
'http://169.254.169.254/latest/meta-data/')
'http://%s/latest/meta-data/' % neutron.METADATA_IPv4)
try:
result = sh.execute(f"curl '{metadata_url}' -I",

View File

@ -15,6 +15,7 @@
from __future__ import absolute_import
import json
import typing
from oslo_log import log
import testtools
@ -32,10 +33,6 @@ LOG = log.getLogger(__name__)
class BaseSecurityGroupTest(testtools.TestCase):
#: Resources stack with Nova server to send messages to
stack = tobiko.required_fixture(
stacks.CirrosServerWithDefaultSecurityGroupStackFixture)
_ovn_nb_db = None
_host_ssh_client = None
_container_runtime_name = None
@ -128,6 +125,10 @@ class BaseSecurityGroupTest(testtools.TestCase):
@neutron.skip_if_missing_networking_extensions('stateful-security-group')
class StatelessSecurityGroupTest(BaseSecurityGroupTest):
#: Resources stack with Nova server to send messages to
stack = tobiko.required_fixture(
stacks.CirrosServerWithDefaultSecurityGroupStackFixture)
def test_default_security_group_is_stateful(self):
"""Test that default security group is always stateful.
@ -203,7 +204,7 @@ class StatelessSecurityGroupTest(BaseSecurityGroupTest):
# Update to stateless
neutron.update_security_group(sg['id'], stateful=False)
sg = neutron.get_security_group_by_id(sg['id'])
sg = neutron.get_security_group(sg['id'])
self.assertFalse(sg['stateful'])
self._check_sg_rules_in_ovn_nb_db(sg, neutron.STATELESS_OVN_ACTION)
new_rule = neutron.create_security_group_rule(
@ -220,7 +221,7 @@ class StatelessSecurityGroupTest(BaseSecurityGroupTest):
# And get back to stateful
neutron.update_security_group(sg['id'], stateful=True)
sg = neutron.get_security_group_by_id(sg['id'])
sg = neutron.get_security_group(sg['id'])
self.assertTrue(sg['stateful'])
self._check_sg_rules_in_ovn_nb_db(sg, neutron.STATEFUL_OVN_ACTION)
new_rule = neutron.create_security_group_rule(
@ -266,3 +267,55 @@ class StatelessSecurityGroupTest(BaseSecurityGroupTest):
)
self._check_sg_rule_in_ovn_nb_db(new_rule['id'],
neutron.STATELESS_OVN_ACTION)
@neutron.skip_if_missing_networking_extensions('port-security',
'stateful-security-group')
class CirrosServerWithStatelessSecurityGroupFixture(
stacks.CirrosServerStackFixture):
"""Heat stack for testing a floating IP instance with port security"""
#: Resources stack with security group to allow ping Nova servers
security_groups_stack = tobiko.required_fixture(
stacks.StatelessSecurityGroupFixture)
@property
def security_groups(self) -> typing.List[str]:
"""List with Stateless security group"""
return [self.security_groups_stack.security_group_id]
@neutron.skip_if_missing_networking_extensions('stateful-security-group')
class StatelessSecurityGroupInstanceTest(BaseSecurityGroupTest):
#: Resources stack with Nova server to send messages to
vm = tobiko.required_fixture(
CirrosServerWithStatelessSecurityGroupFixture)
def test_no_conntrack_entries_related_to_stateless_sg(self):
""" Test that there is no conntrack entry related to stateless SG.
This test ensures that there is no conntrack entry for connection
that passes stateless security group.
Steps:
1. Create server with stateless SG,
2. Allow SSH to that instance
3. Make SSH connection to the instance,
4. Ensure on compute node that there are no conntrack entries
related to that connection there,
"""
host_ssh_client = topology.get_openstack_node(
hostname=self.vm.hypervisor_hostname).ssh_client
vm_ip_address = self.vm.find_fixed_ip(ip_version=4)
# Now lets make ssh connection to the vm and then check in conntrack if
# entry is there or not (it shouldn't be)
sh.execute('hostname', ssh_client=self.vm.ssh_client)
conntrack_list_result = sh.execute(
"conntrack -L --proto tcp --dport 22 --dst %s" % vm_ip_address,
ssh_client=host_ssh_client, sudo=True)
# And ensure that there is no entry found in conntrack
self.assertEqual("", conntrack_list_result.stdout)
self.assertTrue(
"0 flow entries have been shown" in conntrack_list_result.stderr)