Migrate security group logging tests

Changes from original tests:
- Adjust migrated imports and configuration.
- All tests needed added code to run on both podified/devstack.
- Replaced test UUIDs with unique ones.
- Migrated tester (instead of undercloud) plugin
  options for BGP jobs command validations.
- Configuration files in tests according to OSP type.
- Log files in tests according to OSP type.
- Services restart according to OSP type.
- New set service setting method verified for podified/devstack.
- New base method 'is_setup_single_node'.
- nc fixes: timeout added for udp check, replaced to short options.
- Skip unsupported extensions list CLI test on devstack.
- SGRs/ACLs amount test adjusted to devstack stateless/stateful
  2 meter names.
- Bash command wrapper fix since sh can't source openrc.

Change-Id: Iae93a0db4a7d2f00a148c632bd6cc9dc90decd3b
This commit is contained in:
Maor Blaustein 2024-04-03 16:25:46 +03:00
parent e393f1248a
commit a78af0235f
4 changed files with 945 additions and 0 deletions

View File

@ -22,6 +22,22 @@ whitebox_neutron_plugin_options = cfg.OptGroup(
)
WhiteboxNeutronPluginOptions = [
cfg.BoolOpt('exec_on_tester',
default=True,
help='Specify whether to run validated commands on current '
'testing node or other host using ssh.'),
cfg.StrOpt('tester_ip',
default='127.0.0.1',
help='IP of host to execute validated commands.'),
cfg.StrOpt('tester_user',
default='zuul',
help='User at host to execute validated commands.'),
cfg.StrOpt('tester_pass',
default='12345678',
help='Password at host to execute validated commands.'),
cfg.StrOpt('tester_key_file',
default='',
help='Key file to access host to execute validated commands.'),
cfg.BoolOpt('node_power_change',
default=True,
help='Whether to power off/on nodes, '

View File

@ -259,6 +259,12 @@ class BaseTempestWhiteboxTestCase(base.BaseTempestTestCase):
host['is_networker'] = (
True if host['name'] in l3_agent_hosts else False)
@classmethod
def is_setup_single_node(cls):
if not hasattr(cls, 'nodes'):
cls.discover_nodes()
return len(cls.nodes) == 1
@classmethod
def get_pod_of_service(cls, service='neutron'):
pods_list = "oc get pods"

View File

@ -0,0 +1,922 @@
# Copyright 2024 Red Hat, Inc.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from pprint import pformat
import random
import socket
import time
from neutron_lib import constants as neutron_lib_constants
from neutron_tempest_plugin.api import base as api_base
from neutron_tempest_plugin.common import shell
from neutron_tempest_plugin.common import ssh
from neutron_tempest_plugin.common import utils as common_utils
from neutron_tempest_plugin import config
from neutron_tempest_plugin import exceptions as bb_exceptions
from neutron_tempest_plugin.scenario import constants as const
from oslo_log import log
from paramiko import ssh_exception as ssh_exc
from tempest.common import waiters
from tempest.lib.common.utils import data_utils
from tempest.lib.common.utils import test_utils
from tempest.lib import decorators
from tempest.lib import exceptions as lib_exc
from whitebox_neutron_tempest_plugin.tests.scenario import base as wb_base
CONF = config.CONF
WB_CONF = CONF.whitebox_neutron_plugin_options
LOG = log.getLogger(__name__)
class BaseSecGroupLoggingTest(
wb_base.BaseTempestTestCaseOvn, api_base.BaseAdminNetworkTest):
credentials = ['primary', 'admin']
required_extensions = ['router', 'security-group', 'logging']
if WB_CONF.openstack_type == 'podified':
SG_LOG_FILE = '/var/log/containers/openvswitch/ovn-controller.log'
# NOTE(mblue): Configuration path not needed in next gen
ML2_CONF_FILE = ''
rotate_service_fix = '-crond'
elif WB_CONF.openstack_type == 'devstack':
SG_LOG_FILE = '/opt/stack/logs/ovn-controller.log'
ML2_CONF_FILE = '/etc/neutron/plugins/ml2/ml2_conf.ini'
rotate_service_fix = ''
else:
SG_LOG_FILE = '/var/log/containers/stdouts/ovn_controller.log'
ML2_CONF_FILE = ('/var/lib/config-data/puppet-generated'
'/neutron/etc/neutron/plugins/ml2/ml2_conf.ini')
rotate_service_fix = '-crond'
ROTATION_CHECK_CMD = (
'/usr/sbin/logrotate -s /var/lib/logrotate/'
'logrotate{0}.status /etc/logrotate{0}.conf').format(
rotate_service_fix)
SSH_DROP_EXCEPTIONS = (lib_exc.SSHTimeout,
ssh_exc.NoValidConnectionsError,
ssh_exc.SSHException,
socket.error,
socket.timeout,
ConnectionResetError,
EOFError)
@classmethod
def resource_setup(cls):
super(BaseSecGroupLoggingTest, cls).resource_setup()
cls.discover_nodes()
for node in cls.nodes:
if not node['is_controller']:
continue
cls.check_service_setting(
host=node,
service='neutron',
config_files=(WB_CONF.neutron_config,),
param='service_plugins',
value='log',
msg='Security group logging not supported, skipping tests.')
@classmethod
def setup_credentials(cls):
super(BaseSecGroupLoggingTest, cls).setup_credentials()
cls.network_client = cls.os_admin.network_client
if hasattr(
CONF.neutron_plugin_options, 'default_image_is_advanced') and \
CONF.neutron_plugin_options.default_image_is_advanced:
cls.flavor_ref = CONF.compute.flavor_ref
cls.image_ref = CONF.compute.image_ref
cls.username = CONF.validation.image_ssh_user
else:
cls.flavor_ref = (
CONF.neutron_plugin_options.advanced_image_flavor_ref)
cls.image_ref = CONF.neutron_plugin_options.advanced_image_ref
cls.username = CONF.neutron_plugin_options.advanced_image_ssh_user
@classmethod
def setup_clients(cls):
super(BaseSecGroupLoggingTest, cls).setup_clients()
cls.project_id = cls.os_primary.credentials.tenant_id
@classmethod
def _common_resource_setup(cls):
"""Setup resources for both classes of security group logging tests:
with either stateless/stateful security groups.
"""
cls.network = cls.create_network()
cls.subnet = cls.create_subnet(cls.network)
router = cls.create_router_by_client()
cls.create_router_interface(router['id'], cls.subnet['id'])
cls.keypair = cls.create_keypair()
sg_kwargs = {
'name': data_utils.rand_name(
'{}-security-group'.format(cls.__class__.__name__))}
if not cls.is_secgrp_stateful:
sg_kwargs['stateful'] = False
cls.secgrp = cls.os_primary.network_client.create_security_group(
**sg_kwargs)['security_group']
cls.security_groups.append(cls.secgrp)
cls.icmp_rule = cls.create_pingable_secgroup_rule(
secgroup_id=cls.secgrp['id'])
if not cls.is_secgrp_stateful:
# NOTE(slaweq): in case of stateless security groups, there is no
# "related" or "established" traffic matching at all so even if
# egress traffic to 169.254.169.254 is allowed by default SG, we
# need to explicitly allow ingress traffic from the metadata server
# to be able to receive responses in the guest vm
cls.create_security_group_rule(
security_group_id=cls.secgrp['id'],
direction=neutron_lib_constants.INGRESS_DIRECTION,
protocol=neutron_lib_constants.PROTO_NAME_TCP,
remote_ip_prefix='169.254.169.254/32',
description='metadata out')
def setUp(self):
super(BaseSecGroupLoggingTest, self).setUp()
self.addCleanup(test_utils.call_and_ignore_notfound_exc,
self.network_client.reset_quotas, self.project_id)
self.network_client.update_quotas(
self.project_id, security_group=-1, security_group_rule=-1)
self._hypervisors_counts = dict()
def _create_ssh_client(self, host, username=None, key_file=None):
"""Returns ssh client.
Default arguments return client with common credentials for tenant VM.
In case no key_file argument specified, using inherited self.keypair,
and if that fails, trying keys from ssh agent or trying any found key
under ~/.ssh/ as last resort.
"""
if not username:
username = self.username
if not key_file:
pkey = self.keypair['private_key']
else:
pkey = None
try:
_client = ssh.Client(host=host,
username=username,
pkey=pkey,
key_filename=key_file)
except ssh_exc.SSHException:
_client = ssh.Client(host=host,
username=username,
look_for_keys=True)
return _client
@classmethod
def try_ssh_traffic(cls, ssh_client):
"""Let simple ssh command be carried out or time out."""
try:
ssh_client.exec_command('true', timeout=30)
except cls.SSH_DROP_EXCEPTIONS:
pass
@staticmethod
def _try_traffic(_transport_type, fip, port, timeout=None):
# NOTE(mblue): long form nc options not supported in cirros
_udp_opt = ' -u' if _transport_type == 'udp' else ''
cmd = 'echo redhatcat | nc{} -w {} {} {}'.format(
_udp_opt, timeout, fip, port)
try:
shell.execute(cmd)
except bb_exceptions.ShellCommandFailed:
pass
def try_tcp_traffic(self, fip, port=9999, timeout=10):
"""Try to initiate a TCP connection, regardless if succeeded."""
self._try_traffic('tcp', fip, port, timeout)
def try_udp_traffic(self, fip, port=9999, timeout=10):
"""Try to send UDP traffic."""
self._try_traffic('udp', fip, port, timeout)
def _get_logs_and_counts(self, hypervisor_ssh, start_track):
# create dictionary to track values of a hypervisor if it doesn't exist
self._hypervisors_counts.setdefault(hypervisor_ssh.host, dict())
# tracks A value, before test traffic sent to be logged
if start_track:
_track_value = int(hypervisor_ssh.exec_command(
"sudo grep acl_log {} | tail -n1 | cut -d '|' -f 2"
.format(self.SG_LOG_FILE), timeout=120))
self._hypervisors_counts[hypervisor_ssh.host]['A'] = _track_value
LOG.debug("Start log count value A on '%s' is %d",
hypervisor_ssh.host, _track_value)
# tracks B value, after test traffic sent to be logged
# (extracts logs from file right away, to avoid race conditions).
else:
cmds_output = hypervisor_ssh.exec_command(
("B=$(sudo grep acl_log {0} | tail -n1 | cut -d '|' -f 2 | "
"sed 's/^0*//') && echo $B && "
"sudo grep acl_log {0} | tail -n $(($B-{1}))").format(
self.SG_LOG_FILE,
self._hypervisors_counts[hypervisor_ssh.host]['A']),
timeout=120).splitlines()
# save B in instance, and log in tempest the B value
_track_value = int(cmds_output[0])
self._hypervisors_counts[hypervisor_ssh.host]['B'] = _track_value
LOG.debug("End log count value B on '%s' is %d",
hypervisor_ssh.host, _track_value)
# parse and save logs retrieved, per hypervisor tracked counts
self._hypervisors_counts[hypervisor_ssh.host][
'tested_logs'] = "\n".join(cmds_output[1:])
# log in tempest the retrieved entries amount
_test_logs_amount = \
self._hypervisors_counts[hypervisor_ssh.host]['B'] - \
self._hypervisors_counts[hypervisor_ssh.host]['A']
self._hypervisors_counts[hypervisor_ssh.host][
'test_logs_amount'] = _test_logs_amount
LOG.debug(
"Retrieved %d log entries for test assertions from '%s'.",
_test_logs_amount, hypervisor_ssh.host)
def start_track_log(self, hypervisor_ssh):
"""Sets count start value (A), in order to track detla of relevant
log entries for test.
Usually done before test traffic is sent.
Multiple hypervisors (computes) can be tracked simultaneously.
"""
self._get_logs_and_counts(hypervisor_ssh, start_track=True)
def retrieve_tracked_log(self, hypervisor_ssh):
"""Sets count end value (B), and accordingly save only test log
entries of relevant given hypervisor, used later for test assertions.
(In same ssh session and consecutive command to avoid race condition).
Multiple hypervisors (computes) can be tracked simultaneously.
"""
self._get_logs_and_counts(hypervisor_ssh, start_track=False)
def _check_log(self, should_log, pattern, fail_msg, hypervisor_ssh):
if should_log:
self.assertRegex(
self._hypervisors_counts[hypervisor_ssh.host]['tested_logs'],
pattern,
fail_msg.format('should'))
else:
self.assertNotRegex(
self._hypervisors_counts[hypervisor_ssh.host]['tested_logs'],
pattern,
fail_msg.format('should not'))
def check_log_tcp(self, should_log, hypervisor_ssh, port=9999):
pattern = 'acl_log.*tcp.*tp_dst={}'.format(port)
fail_msg = ('TCP traffic to port {} {{}} '
'be logged in test log entries.').format(port)
self._check_log(should_log, pattern, fail_msg, hypervisor_ssh)
def check_log_udp(self, should_log, hypervisor_ssh, port=9999):
pattern = 'acl_log.*udp.*tp_dst={}'.format(port)
fail_msg = ('UDP traffic to port {} {{}} '
'be logged in test log entries.').format(port)
self._check_log(should_log, pattern, fail_msg, hypervisor_ssh)
def check_log_icmp(
self, should_log, hypervisor_ssh, both_directions=False):
fail_msg = 'ICMP {} traffic {{}} be logged in tested log entries.'
# pairs of logging pattern, and matching failure message
patterns = [
(r'acl_log.*icmp.*icmp_type=8', fail_msg.format('request'))]
if both_directions:
patterns.append(
(r'acl_log.*icmp.*icmp_type=0', fail_msg.format('reply')))
for ptn in patterns:
self._check_log(should_log, *ptn, hypervisor_ssh)
def check_log_ssh(self, should_log, hypervisor_ssh):
# in RFE all test cases use drop verdict for ssh, therefore hardcoded
pattern = r'acl_log.*verdict=drop.*tcp.*tp_dst=22'
fail_msg = 'ssh traffic {} be logged in tested log entries.'
self._check_log(should_log, pattern, fail_msg, hypervisor_ssh)
# Wrapper function for server creation with hypervisor info,
# floating ip, vm and hypervisor ssh clients.
#
# VM on same/different compute according to ID, using schedular hints.
#
# Returns dictionary of server details with additions.
def _create_server(self, scheduler_hints=None):
server_params = {
'flavor_ref': self.flavor_ref,
'image_ref': self.image_ref,
'key_name': self.keypair['name'],
'networks': [{'uuid': self.network['id']}],
'security_groups': [{'name': self.secgrp['id']}],
'name': data_utils.rand_name('{}-vm'.format(
self.__class__.__name__))
}
if scheduler_hints:
server_params['scheduler_hints'] = scheduler_hints
server_id = self.create_server(**server_params)['server']['id']
waiters.wait_for_server_status(
self.os_admin.servers_client, server_id,
const.SERVER_STATUS_ACTIVE)
server = self.os_admin.servers_client.show_server(
server_id)['server']
server_port = self.client.list_ports(
network_id=self.network['id'],
device_id=server['id'])['ports'][0]
server['fip'] = self.create_floatingip(
port=server_port)['floating_ip_address']
self.ping_ip_address(server['fip'])
server['ssh_client'] = self._create_ssh_client(server['fip'])
server['hv_base_name'] = server[
'OS-EXT-SRV-ATTR:hypervisor_hostname'].split('.')[0]
server['hv_ssh_client'] = self.find_node_client(
server['hv_base_name'])
return server
# Wrapper function for common creation of logs
# Returns dictionary of log details.
def _create_log(self,
name='',
resource_id='',
event='ALL',
description='',
enabled=True,
add_cleanup=True):
_log = self.create_log(
name=name or data_utils.rand_name('{}-test-log'.format(
self.__class__.__name__)),
description=description,
resource_id=resource_id or self.secgrp['id'],
event=event,
enabled=enabled)
if add_cleanup:
self.addCleanup(self.admin_client.delete_log, _log['id'])
return _log
def verify_meter_and_band_amounts(self, meter_range, meter_band_range):
meter_count = int(self.master_node_client.exec_command(
self.nbctl + ' list meter | grep _uuid | wc -l'))
meter_band_count = int(self.master_node_client.exec_command(
self.nbctl + ' list meter-band | grep _uuid | wc -l'))
self.assertIn(
meter_count, meter_range,
"Log meter count in NB '{}' not as expected '{}'".format(
meter_count, meter_range))
self.assertIn(
meter_band_count, meter_band_range,
"Log meter band count in NB '{}' not as expected '{}'".format(
meter_band_count, meter_band_range))
# Logging tests for both stateful/stateless security group classes
def _test_log_commands(self):
"""Test verifies OpenStack log CLI commands run successfully,
and return the expected output.
"""
prefix = self.get_osp_cmd_prefix()
log_name_1 = data_utils.rand_name('cli-test-log-1')
log_name_2 = data_utils.rand_name('cli-test-log-2')
cmds = [
# 1) verify that logging extension is enabled
'openstack extension list --fit-width | grep logging',
# 2) make sure logs are enabled for security groups
'openstack network loggable resources list',
# 3) perform logging for a specific security group
('openstack network log create --resource-type security_group '
'--resource {} --event ALL {}').format(
self.secgrp['id'], log_name_1),
# 4) show details of the first logging object
'openstack network log show {}'.format(log_name_1),
# 5) perform logging of accept events for all security groups
('openstack network log create --resource-type security_group '
'--event ACCEPT {}').format(log_name_2),
# 6) list existing logging objects
'openstack network log list',
# 7) delete both new logging objects
'openstack network log delete {} {}'.format(
log_name_1, log_name_2)]
stdout_patterns = [
# 1) verify that logging extension is enabled in output
'Logging API Extension',
# 2) check logs are enabled for security groups in output
'security_group',
# 3) check create log per SG command outputs enabled log, for all
# events, with desired name
'|.*Enabled.*|.*True.*|.*Event.*|.*ALL.*|.*Name.*|.*{}'.format(
log_name_1),
# 4) check show log per SG command outputs enabled log, for all
# events, with desired name
'|.*Enabled.*|.*True.*|.*Event.*|.*ALL.*|.*Name.*|.*{}'.format(
log_name_1),
# 5) check create log for all SGs command has enabled log,
# for accept events, with desired name, no resource attached
('|.*Enabled.*|.*True.*|.*Event.*|.*ACCEPT.*|.*Name.*|.*{}.*'
'|.*Resource.*|.*None.*|.*Target').format(log_name_2),
# 6) both premade log names in output
'True.*|.*{}.*|.*True.*|.*{}'.format(log_name_1, log_name_2),
# 7) output not expected
'']
# NOTE(mblue): devstack: Extensions list not supported by Identity API
if WB_CONF.openstack_type == 'devstack':
cmds.pop(0)
stdout_patterns.pop(0)
for cmd, ptn in zip(cmds, stdout_patterns):
self.validate_command('bash -c "' + prefix + cmd + '"', ptn)
def _test_only_dropped_traffic_logged(self):
"""This scenario verifies that only the log entries of dropped traffic
exist when only the "drop" event is supposed to be logged
('--event DROP' option).
Traffic of "accept" events is generated but not supposed to be logged
(Steps 1 - 3 and 8 - 12).
This scenario also verifies rate and burst limit configuration changes
properly in overcloud nodes
(Steps 4 - 7).
"""
vm_a = self._create_server()
# 1) enable logging only for "drop" events
self._create_log(event='DROP')
# 2) attempt ssh to VM from undercloud ("drop" event),
# this ensures an initial log entry exists, with needed count value.
self.try_ssh_traffic(vm_a['ssh_client'])
# 3) tracks count value of last log entry, from hypervisor,
# before sending test traffic to be logged.
self.start_track_log(vm_a['hv_ssh_client'])
# 4) configure higher rate and burst limits on all controllers
# default preset (min possible): rate limit 100, burst limit 25
burst_val = random.randint(30, 150)
rate_val = random.randint(120, 300)
self.set_service_setting(file=self.ML2_CONF_FILE,
section='network_log',
param='burst_limit',
value=burst_val)
self.set_service_setting(file=self.ML2_CONF_FILE,
section='network_log',
param='rate_limit',
value=rate_val)
# 5) restart neutron api on all controllers simultaneously
if not WB_CONF.openstack_type == 'podified':
if WB_CONF.openstack_type == 'devstack':
service_ptn = 'q svc'
else:
service_ptn = 'neutron api'
for node in self.nodes:
if node['is_controller']:
# NOTE(mblue): if reset fails on multinode, consider
# wait_until_active=False for a more simultaneous reset
self.reset_node_service(service_ptn, node['client'])
# 6) verify rate/burst limits set on master/leader controller meter
wait_err = ('Timed out: burst/rate limits not configured properly '
'on master/leader controller.')
common_utils.wait_until_true(
lambda: self.validate_command(
'{} list meter-band'.format(self.nbctl),
pattern=r'burst_size[ \t]+:[ \t]+{}'.format(burst_val),
ssh_client=self.master_node_client,
ret_bool_pattern=True),
timeout=180,
sleep=10,
exception=RuntimeError(wait_err))
# extra command call needed due to regex limitation
self.validate_command(
'{} list meter-band'.format(self.nbctl),
pattern=r'rate[ \t]+:[ \t]+{}'.format(rate_val),
ssh_client=self.master_node_client)
# 7) verify openvswitch meter configuration on compute of VM
self.validate_command(
'sudo ovs-ofctl dump-meters br-int -O OpenFlow15',
pattern='type=drop rate={} burst_size={}'.format(
rate_val, burst_val),
ssh_client=vm_a['hv_ssh_client'])
# 8) ping VM from undercloud (supposed to pass but not be logged)
self.ping_ip_address(vm_a['fip'])
# 9) ssh VM from undercloud (supposed to drop traffic and be logged)
self.try_ssh_traffic(vm_a['ssh_client'])
# 10) get last log entry count value, and relevant log entries
# from compute, after sending traffic to be logged
self.retrieve_tracked_log(vm_a['hv_ssh_client'])
# 11) verify ICMP isn't logged at all (allowed "accept" event traffic)
self.check_log_icmp(
should_log=False, hypervisor_ssh=vm_a['hv_ssh_client'])
# 12) verify ssh attempts are logged (disallowed "drop" event traffic)
self.check_log_ssh(
should_log=True, hypervisor_ssh=vm_a['hv_ssh_client'])
def _test_only_accepted_traffic_logged(self):
"""This test verifies that only the log entries of allowed traffic
exist when only the "allow" event is supposed to be logged
(--event ACCEPT option).
Traffic of "drop" events is generated but not supposed to be logged.
(Steps 1 - 8)
This scenario also verifies that both directions of traffic are logged
by same SGR, checking ICMP echo request and reply in logs.
(Step 7)
This scenario also verifies that when adding SGRs ingress
for dns and ssh traffic, single meter and meter band exist in NB,
and all SGRs/ACLs are linked to same meter in NB.
(Step 9 - 12)
This scenario also verifies configured rate and burst limits
are enforced properly, when excessive traffic amount matches.
(Steps 13 - 17).
The scenario verifies no duplicate entries on same/different computes.
Test runs twice, once 2 VMs on same compute,
once on different computes, same network and subnet used for VMs.
Test sends a constant amount of ICMP requests from the first VM to
the second VM locally in same subnet.
Afterwards verifies a constant amount of log entries was written
for the ICMP traffic, in same/different compute, checking there
are no duplicated log entries.
Duplicate entries checked using from-lport/to-lport and request/reply,
each of 4 possible options combination should have constant amount
(Only in different computes test, amount is split in specific way
described in later comments of test code).
(Step 18 - 20)
"""
vm_a = self._create_server()
# 1) enable logging only for "accept" events
self._create_log(event='ACCEPT')
# 2) ping VM from undercloud ("accept" event),
# this ensures an initial log entry exists, with needed count value.
self.ping_ip_address(vm_a['fip'])
# 3) tracks count value of last log entry, from hypervisor,
# before sending test traffic to be logged.
self.start_track_log(vm_a['hv_ssh_client'])
# 4) ping VM from undercloud (supposed to pass and be logged)
self.ping_ip_address(vm_a['fip'])
# 5) ssh VM from undercloud (supposed to drop traffic, not be logged)
self.try_ssh_traffic(vm_a['ssh_client'])
# 6) get last log entry count value, and relevant log entries
# from compute, after sending traffic to be logged
self.retrieve_tracked_log(vm_a['hv_ssh_client'])
# 7) verify ICMP is logged in both directions ("accept" traffic)
self.check_log_icmp(
should_log=True, hypervisor_ssh=vm_a['hv_ssh_client'],
both_directions=True)
# 8) verify ssh attempts aren't logged ("drop" event traffic)
self.check_log_ssh(
should_log=False, hypervisor_ssh=vm_a['hv_ssh_client'])
# 9) add temporary SGR allowing ingress tcp port 22 (ssh traffic)
ssh_sgr_in = self.create_security_group_rule(
security_group_id=self.secgrp['id'],
protocol='tcp',
direction='ingress',
port_range_min=22,
port_range_max=22)
self.addCleanup(
self.admin_client.delete_security_group_rule, ssh_sgr_in['id'])
# 10) add temporary SGR allowing ingress udp port 53 (dns traffic)
dns_sgr = self.create_security_group_rule(
security_group_id=self.secgrp['id'],
protocol='udp',
direction='ingress',
port_range_min=53,
port_range_max=53)
self.addCleanup(
self.admin_client.delete_security_group_rule, dns_sgr['id'])
# fetch updated ingress SGRs in logging SG
sg_rules = self.admin_client.show_security_group(
self.secgrp['id'])['security_group']['security_group_rules']
sg_rules_ingress = [rule for rule in sg_rules
if rule['direction'] == 'ingress']
# 11) verify single meter and meter band quantity in NB
self.verify_meter_and_band_amounts((1, 2), (1, 2))
# 12) verify same logging SGRs/ACLs amount linked to relevant meter
# NOTE(mblue): devstack ACLs for stateless SG with another meter name
if WB_CONF.openstack_type == 'devstack' and \
not self.is_secgrp_stateful:
meter_postfix = '.log_stateless"'
else:
meter_postfix = '.log"'
log_acls_output = self.master_node_client.exec_command(
self.nbctl + ' find acl log=true | grep -E '
"'meter.*:.*(log_meter|test_log{})'".format(meter_postfix))
# needed SGR amount may change due to stateful/stateless setting
self.assertEqual(len(sg_rules_ingress),
len(log_acls_output.splitlines()),
"Not all logging SGRs/ACLs linked to relevant meter")
# 13) fetch current configured rate and burst limits
burst_limit = int(self.validate_command(
'sudo crudini --get {} network_log burst_limit'.format(
self.ML2_CONF_FILE),
ssh_client=self.master_node_client))
rate_limit = int(self.validate_command(
'sudo crudini --get {} network_log rate_limit'.format(
self.ML2_CONF_FILE),
ssh_client=self.master_node_client))
# 14) send excessive ICMP requests to VM for 1 second
self.start_track_log(vm_a['hv_ssh_client'])
self.validate_command(
'sudo ping {} -i 0.002 -c 500 | tail -n4'.format(vm_a['fip']),
pattern=r' 0% packet loss')
self.retrieve_tracked_log(vm_a['hv_ssh_client'])
# 15) verify log entries amount equals to:
# rate limit + burst limit (up to 10% offset allowed)
limits_fail_msg = "rate or burst log limits not enforced correctly"
quick_test_target = burst_limit + rate_limit
quick_test_logs_amount = self._hypervisors_counts[
vm_a['hv_ssh_client'].host]['test_logs_amount']
self.assertIn(quick_test_logs_amount,
range(round(0.9 * quick_test_target),
round(1.1 * quick_test_target)),
limits_fail_msg)
# 16) send excessive ICMP requests to VM for 60 seconds
self.start_track_log(vm_a['hv_ssh_client'])
self.validate_command(
'sudo ping {} -i 0.005 -c 12000 | tail -n4'.format(vm_a['fip']),
pattern=r' 0% packet loss',
timeout=70)
self.retrieve_tracked_log(vm_a['hv_ssh_client'])
# 17) verify log entries amount equals to:
# rate limit * 60 + burst limit (up to 2% offset allowed)
long_test_target = rate_limit * 60 + burst_limit
long_test_logs_amount = self._hypervisors_counts[
vm_a['hv_ssh_client'].host]['test_logs_amount']
self.assertIn(long_test_logs_amount,
range(round(0.98 * long_test_target),
round(1.02 * long_test_target)),
limits_fail_msg)
# NOTE(mblue): skip multi-compute test when setup is single node
for vms_same_compute in (True, False):
if self.is_setup_single_node() and not vms_same_compute:
continue
self._verify_duplicate_entries(vms_same_compute, vm_a)
def _verify_duplicate_entries(self, same_compute, vm_a):
# 18) create second VM on the same/different compute node
compute_hint_str = 'same' if same_compute else 'different'
vm_b = self._create_server(
scheduler_hints={'{}_host'.format(compute_hint_str): vm_a['id']})
vm_b_internal_ip = list(vm_b['addresses'].values())[0][0]['addr']
time.sleep(5)
# 19) track logs during ping, 10 times from VM 1 to VM 2
# (log file located on compute)
self.start_track_log(vm_a['hv_ssh_client'])
if not same_compute:
self.start_track_log(vm_b['hv_ssh_client'])
ping_amount = 10
self.check_remote_connectivity(
vm_a['ssh_client'], vm_b_internal_ip, ping_count=ping_amount)
self.retrieve_tracked_log(vm_a['hv_ssh_client'])
if not same_compute:
self.retrieve_tracked_log(vm_b['hv_ssh_client'])
# 20) verify each possible direction and type logged as ping amount
icmp_types = ('0', '8') # reply/request
directions = ('to-lport', 'from-lport') # to/from logical switch
checked_options = [(d, t) for d in directions for t in icmp_types]
entries_list = [self._hypervisors_counts[
vm_a['hv_ssh_client'].host]['tested_logs'].splitlines()]
if not same_compute:
entries_list.append(self._hypervisors_counts[
vm_b['hv_ssh_client'].host]['tested_logs'].splitlines())
err_msg = (
'\nlog entries amount of ICMP type "{}", direction "{}", '
'not according to ping amount, which is {}.\n'
'Notice code comments, tempest.log for further debugging.\n'
'These are the log_acl entries captured while ping from '
'VM 1 to VM 2:\n'
'\n\n{}\n\n')
for i, entries in enumerate(entries_list):
# different computes case: each compute holds half logs like so:
# VM A's compute -> request with to-lport, reply with from-lport.
# (compute of client VM)
# VM B's compute -> request with from-lport, reply with to-lport.
# (compute of server VM)
if not same_compute:
checked_options_filt = \
checked_options[::3] if i == 0 else checked_options[1:3]
else:
checked_options_filt = checked_options
for direction, icmp_type in checked_options_filt:
entries_with_opts_count = len([
ent for ent in entries if
'direction={}'.format(direction) in ent and
'icmp_type={}'.format(icmp_type) in ent])
LOG.debug(
('same_computes=%s, hv=%d, direction=%s, icmp_type=%s '
'-> entries count %d'),
str(same_compute), i, direction, icmp_type,
entries_with_opts_count)
self.assertEqual(
entries_with_opts_count,
ping_amount,
err_msg.format(
icmp_type, direction, ping_amount, pformat(entries)))
def _test_dropped_and_accepted_traffic_logged(self):
"""This scenario verifies that the log entries of both dropped and
accepted events exist when both "drop" and "accept" events are
supposed to be logged (--event ALL option).
This scenario also verifies indications of a few protocols are logged:
using TCP, UDP, SSH and ICMP traffic
(Steps 1 - 11).
This scenario also verifies log file rotation done when max size
configured per log file reached
(Steps 12 - 16).
This scenario verifies allowed and unallowed traffic (ICMP and SSH/UDP)
aren't logged, after log disabled, also after log deleted.
(Steps 18 - 23).
Northbound DB checks used to verify ACL log value enabled/disabled,
and meter, meter band amounts when log exists and removed.
(17, 19, 24).
"""
vm_a = self._create_server()
if WB_CONF.openstack_type == 'devstack':
rotate_prefix = 'sudo '
else:
rotate_prefix = 'sudo podman exec logrotate_crond '
# 1) enable logging for "all" events
local_log = self._create_log(event='ALL', add_cleanup=False)
# 2) attempt ssh to VM from undercloud ("drop" event),
# this ensures an initial log entry exists, with needed count value.
self.try_ssh_traffic(vm_a['ssh_client'])
# 3) tracks count value of last log entry, from hypervisor,
# before sending test traffic to be logged.
self.start_track_log(vm_a['hv_ssh_client'])
# 4) ping VM from undercloud (supposed to pass and be logged)
self.ping_ip_address(vm_a['fip'])
# 5) ssh VM from undercloud (supposed to drop traffic and be logged)
self.try_ssh_traffic(vm_a['ssh_client'])
# 6) send dropped TCP and UDP traffic port 9999
self.try_tcp_traffic(vm_a['fip'])
self.try_udp_traffic(vm_a['fip'])
# 7) get last log entry count value, and relevant log entries
# from compute, after sending traffic to be logged
self.retrieve_tracked_log(vm_a['hv_ssh_client'])
# 8) verify ICMP is logged (allowed "accept" event traffic)
self.check_log_icmp(
should_log=True, hypervisor_ssh=vm_a['hv_ssh_client'])
# 9) verify TCP traffic with port 9999 is indicated in logs
self.check_log_tcp(
should_log=True, hypervisor_ssh=vm_a['hv_ssh_client'])
# 10) verify UDP traffic with port 9999 is indicated in logs
self.check_log_udp(
should_log=True, hypervisor_ssh=vm_a['hv_ssh_client'])
# 11) verify ssh attempts are logged (disallowed "drop" event traffic)
self.check_log_ssh(
should_log=True, hypervisor_ssh=vm_a['hv_ssh_client'])
# 12) fetch 'maxsize' value which configures log rotation threshold
maxsize_si = self.validate_command(
rotate_prefix + 'grep maxsize /etc/logrotate{}.conf; true'.format(
self.rotate_service_fix),
ssh_client=vm_a['hv_ssh_client']).rstrip().split(' ')[-1]
# if setup type supports log rotation due to size, then test
if maxsize_si:
# convert to bytes without SI prefixes k/M/G (logrotate SI options)
try:
power = 10 * ('kMG'.index(maxsize_si[-1]) + 1)
maxsize_num = maxsize_si[:-1]
maxsize = int(maxsize_num) * 2 ** power
# no SI symbol
except ValueError:
maxsize = int(maxsize_si)
overflow_size = maxsize + 1
# 13) force check if log rotation needed (before test start)
self.validate_command(
rotate_prefix + self.ROTATION_CHECK_CMD,
ssh_client=vm_a['hv_ssh_client'])
# current active log file size in bytes (before size overflow)
active_log_pre_overflow = int(self.validate_command(
'sudo wc -c {}'.format(self.SG_LOG_FILE),
ssh_client=vm_a['hv_ssh_client']).split()[0])
# 14) adding bytes into active log file to pass 'maxsize' value
self.validate_command(('''python3 -c "print('A'*{})" | '''
'sudo tee -a {} 1> /dev/null').format(
overflow_size, self.SG_LOG_FILE),
ssh_client=vm_a['hv_ssh_client'])
# 15) force check if log rotation is needed
self.validate_command(
rotate_prefix + self.ROTATION_CHECK_CMD,
ssh_client=vm_a['hv_ssh_client'])
# current active log file size in bytes (after size overflow)
active_log_post_overflow = int(self.validate_command(
'sudo wc -c {}'.format(self.SG_LOG_FILE),
ssh_client=vm_a['hv_ssh_client']).split()[0])
# new rotated log file size in bytes (after size overflow)
rotated_log_post_overflow = int(self.validate_command(
'sudo wc -c {}'.format(self.SG_LOG_FILE + '.1'),
ssh_client=vm_a['hv_ssh_client']).split()[0])
# 16) verify log rotation succesful:
# - current new active log has a small size (below 'offset').
# - current rotated log similar to previous active log size
# (up until additional 'offset' bytes of logs allowed).
offset = 1500
self.assertLess(active_log_post_overflow, offset,
"Active log after rotation too big")
self.assertIn(
rotated_log_post_overflow,
range(active_log_pre_overflow + overflow_size,
active_log_pre_overflow + overflow_size + offset),
"Rotated log size much different than expected")
# this ensures an initial DROP log entry exists (after rotation)
self.try_ssh_traffic(vm_a['ssh_client'])
# 17) verify single meter and meter band in NBDB
self.verify_meter_and_band_amounts((1, 2), (1, 2))
# 18) test no logging done, after log disabled, and after log deleted
icmp_acl_cmd = self.nbctl + \
(r' find acl "external_ids:\"neutron:security_group_rule_id\""='
'"{}"'.format(self.icmp_rule['id']))
for _request_op in range(2):
if _request_op == 0:
# 19) verify log value of ACL in NB, before and after disable
icmp_acl_output = self.master_node_client.exec_command(
icmp_acl_cmd)
self.assertRegex(
icmp_acl_output, r'log[ \t]*:[ \t]*true',
'ACL log value in NB not true, when SGR enabled')
self.admin_client.update_log(local_log['id'], enabled=False)
icmp_acl_output = self.master_node_client.exec_command(
icmp_acl_cmd)
self.assertRegex(
icmp_acl_output, r'log[ \t]*:[ \t]*false',
'ACL log value in NB not false, when SGR disabled')
else:
self.admin_client.delete_log(local_log['id'])
# 20) track only logging entries after disable/deletion
self.start_track_log(vm_a['hv_ssh_client'])
# 21) send disallowed SSH/UDP traffic, and allowed ICMP traffic
self.try_ssh_traffic(vm_a['ssh_client'])
self.try_udp_traffic(vm_a['fip'])
self.ping_ip_address(vm_a['fip'])
# 22) stop tracking logs
self.retrieve_tracked_log(vm_a['hv_ssh_client'])
# 23) verify no SSH/UDP/ICMP traffic logged
self.check_log_ssh(
should_log=False, hypervisor_ssh=vm_a['hv_ssh_client'])
self.check_log_udp(
should_log=False, hypervisor_ssh=vm_a['hv_ssh_client'])
self.check_log_icmp(
should_log=False, hypervisor_ssh=vm_a['hv_ssh_client'])
# 24) verify meter / meter band don't exist in NB after log deletion
self.verify_meter_and_band_amounts((0,), (0,))
class StatefulSecGroupLoggingTest(BaseSecGroupLoggingTest):
is_secgrp_stateful = True
@classmethod
def resource_setup(cls):
super(StatefulSecGroupLoggingTest, cls).resource_setup()
cls._common_resource_setup()
@decorators.idempotent_id('ad5f67e0-48d2-4af5-8bc2-b0a8973d4fba')
def test_log_commands(self):
self._test_log_commands()
@decorators.idempotent_id('1cb767a8-74c3-42a4-a705-c9f9622f59c9')
def test_only_dropped_traffic_logged(self):
self._test_only_dropped_traffic_logged()
@decorators.idempotent_id('d1b4cea8-e2cb-4d90-a135-ea41a5fb212a')
def test_only_accepted_traffic_logged(self):
self._test_only_accepted_traffic_logged()
@decorators.idempotent_id('003f7f7c-f8b8-4bfe-a24a-12e1c2a14cb9')
def test_dropped_and_accepted_traffic_logged(self):
self._test_dropped_and_accepted_traffic_logged()
class StatelessSecGroupLoggingTest(BaseSecGroupLoggingTest):
required_extensions = BaseSecGroupLoggingTest.required_extensions + \
['stateful-security-group']
is_secgrp_stateful = False
@classmethod
def resource_setup(cls):
super(StatelessSecGroupLoggingTest, cls).resource_setup()
cls._common_resource_setup()
@decorators.idempotent_id('65319597-a504-4f29-947b-ae57f4ac351c')
def test_log_commands(self):
self._test_log_commands()
@decorators.idempotent_id('7bab2764-2aac-49da-bd0f-161128695bfb')
def test_only_dropped_traffic_logged(self):
self._test_only_dropped_traffic_logged()
@decorators.idempotent_id('2efc5a0c-859c-4a35-b658-52d323c46fef')
def test_only_accepted_traffic_logged(self):
self._test_only_accepted_traffic_logged()
@decorators.idempotent_id('e26ccbf1-4b4a-4812-9e6f-9b59cf196540')
def test_dropped_and_accepted_traffic_logged(self):
self._test_dropped_and_accepted_traffic_logged()

View File

@ -101,6 +101,7 @@
- l3-ha
- l3-ndp-proxy
- l3_agent_scheduler
- logging
- metering
- multi-provider
- net-mtu