Merge "Add test_extra_dhcp_opts_logs_unsupported_options"

This commit is contained in:
Zuul 2021-09-01 08:59:33 +00:00 committed by Gerrit Code Review
commit cac42ed346
5 changed files with 151 additions and 6 deletions

View File

@ -55,6 +55,7 @@ find_port = _client.find_port
list_ports = _client.list_ports list_ports = _client.list_ports
get_port_extra_dhcp_opts = _client.get_port_extra_dhcp_opts get_port_extra_dhcp_opts = _client.get_port_extra_dhcp_opts
create_port = _client.create_port create_port = _client.create_port
update_port = _client.update_port
delete_port = _client.delete_port delete_port = _client.delete_port
list_subnets = _client.list_subnets list_subnets = _client.list_subnets
list_subnet_cidrs = _client.list_subnet_cidrs list_subnet_cidrs = _client.list_subnet_cidrs

View File

@ -184,6 +184,11 @@ def create_port(client=None, **params):
return port['port'] return port['port']
def update_port(port_id, client=None, **params):
port = neutron_client(client).update_port(port_id, body={'port': params})
return port['port']
def delete_port(port, client=None): def delete_port(port, client=None):
try: try:
neutron_client(client).delete_port(port) neutron_client(client).delete_port(port)

View File

@ -51,3 +51,5 @@ verify_osp_version = _topology.verify_osp_version
NeutronNovaResponse = _neutron.NeutronNovaResponse NeutronNovaResponse = _neutron.NeutronNovaResponse
NeutronNovaResponseReader = _neutron.NeutronNovaResponseReader NeutronNovaResponseReader = _neutron.NeutronNovaResponseReader
read_neutron_nova_responses = _neutron.read_neutron_nova_responses read_neutron_nova_responses = _neutron.read_neutron_nova_responses
assert_ovn_unsupported_dhcp_option_messages = (
_neutron.assert_ovn_unsupported_dhcp_option_messages)

View File

@ -18,12 +18,17 @@ import datetime
import re import re
import typing import typing
from oslo_log import log
import tobiko import tobiko
from tobiko.openstack import neutron from tobiko.openstack import neutron
from tobiko.openstack.topology import _topology from tobiko.openstack.topology import _topology
from tobiko.shell import files from tobiko.shell import files
LOG = log.getLogger(__name__)
class NeutronNovaResponse(typing.NamedTuple): class NeutronNovaResponse(typing.NamedTuple):
hostname: str hostname: str
timestamp: float timestamp: float
@ -38,13 +43,12 @@ class NeutronNovaResponse(typing.NamedTuple):
return self.timestamp < other.timestamp return self.timestamp < other.timestamp
class NeutronNovaResponseReader(tobiko.SharedFixture): class NeutronNovaCommonReader(tobiko.SharedFixture):
log_digger: files.MultihostLogFileDigger log_digger: files.MultihostLogFileDigger
groups = ['controller'] groups: typing.List[str]
message_pattern = r'Nova event response: ' message_pattern: str
datetime_pattern = re.compile(r'(\d{4}-\d{2}-\d{2} [0-9:.]+) .+') datetime_pattern = re.compile(r'(\d{4}-\d{2}-\d{2} [0-9:.]+) .+')
service_name = neutron.SERVER service_name = neutron.SERVER
responses: tobiko.Selection[NeutronNovaResponse]
def setup_fixture(self): def setup_fixture(self):
self.log_digger = self.useFixture( self.log_digger = self.useFixture(
@ -55,13 +59,22 @@ class NeutronNovaResponseReader(tobiko.SharedFixture):
self.read_responses() self.read_responses()
def _get_log_timestamp(self, def _get_log_timestamp(self,
log: str) -> float: log_line: str) -> float:
found = self.datetime_pattern.match(log) found = self.datetime_pattern.match(log_line)
if not found: if not found:
return 0.0 return 0.0
return datetime.datetime.strptime( return datetime.datetime.strptime(
found.group(1), "%Y-%m-%d %H:%M:%S.%f").timestamp() found.group(1), "%Y-%m-%d %H:%M:%S.%f").timestamp()
def read_responses(self):
raise NotImplementedError
class NeutronNovaResponseReader(NeutronNovaCommonReader):
groups = ['controller']
message_pattern = r'Nova event response: '
responses: tobiko.Selection[NeutronNovaResponse]
def read_responses(self) \ def read_responses(self) \
-> tobiko.Selection[NeutronNovaResponse]: -> tobiko.Selection[NeutronNovaResponse]:
responses = tobiko.Selection[NeutronNovaResponse]() responses = tobiko.Selection[NeutronNovaResponse]()
@ -100,3 +113,74 @@ def read_neutron_nova_responses(
if attributes and responses: if attributes and responses:
responses = responses.with_attributes(**attributes) responses = responses.with_attributes(**attributes)
return responses return responses
class UnsupportedDhcpOptionMessage(typing.NamedTuple):
port_uuid: str
unsupported_dhcp_option: str
timestamp: float
line: str
def __lt__(self, other):
return self.timestamp < other.timestamp
@neutron.skip_unless_is_ovn()
class OvnUnsupportedDhcpOptionReader(NeutronNovaCommonReader):
groups = ['controller']
message_pattern = (
'The DHCP option .* on port .* is not suppported by OVN, ignoring it')
responses: tobiko.Selection[UnsupportedDhcpOptionMessage]
def read_responses(self) \
-> tobiko.Selection[UnsupportedDhcpOptionMessage]:
def _get_port_uuid(line):
port_pattern = 'on port (.*) is not suppported by OVN'
return re.findall(port_pattern, line)[0]
def _get_dhcp_option(line):
dhcp_opt_pattern = 'The DHCP option (.*) on port'
return re.findall(dhcp_opt_pattern, line)[0]
responses = tobiko.Selection[UnsupportedDhcpOptionMessage]()
message_pattern = re.compile(self.message_pattern)
for _, line in self.log_digger.find_lines(
new_lines=hasattr(self, 'responses')):
found = message_pattern.search(line)
assert found is not None
response = UnsupportedDhcpOptionMessage(
line=line,
timestamp=self._get_log_timestamp(line[:found.start()]),
port_uuid=_get_port_uuid(line),
unsupported_dhcp_option=_get_dhcp_option(line))
responses.append(response)
responses.sort()
if hasattr(self, 'responses'):
self.responses.extend(responses)
else:
self.responses = responses
return responses
def assert_ovn_unsupported_dhcp_option_messages(
reader: OvnUnsupportedDhcpOptionReader = None,
new_lines=True,
unsupported_options: typing.Optional[typing.List] = None,
**attributes):
if reader is None:
reader = tobiko.setup_fixture(OvnUnsupportedDhcpOptionReader)
# find new logs that match the pattern
responses = reader.read_responses()
if not new_lines:
responses = reader.responses
if attributes and responses:
responses = responses.with_attributes(**attributes)
# assert one line matches per unsupported dhcp option
test_case = tobiko.get_test_case()
for unsupported_option in unsupported_options or []:
messages_unsupported_option = responses.with_attributes(
unsupported_dhcp_option=unsupported_option)
test_case.assertEqual(1, len(messages_unsupported_option))
LOG.debug('Found one match for unsupported dhcp option '
f'{unsupported_option}')

View File

@ -19,6 +19,7 @@ import re
import typing import typing
import netaddr import netaddr
from neutron_lib import constants
from oslo_log import log from oslo_log import log
import testtools import testtools
@ -33,6 +34,8 @@ from tobiko.openstack import topology
LOG = log.getLogger(__name__) LOG = log.getLogger(__name__)
IPV4 = constants.IP_VERSION_4
IPV6 = constants.IP_VERSION_6
class PortTest(testtools.TestCase): class PortTest(testtools.TestCase):
@ -220,3 +223,53 @@ class ExtraDhcpOptsPortTest(PortTest):
re.search(r'^search\s+{domain}$'.format(domain=domain), re.search(r'^search\s+{domain}$'.format(domain=domain),
vm_resolv_conf, vm_resolv_conf,
re.MULTILINE)) re.MULTILINE))
@neutron.skip_unless_is_ovn()
class ExtraDhcpOptsPortLoggingTest(testtools.TestCase):
stack = tobiko.required_setup_fixture(stacks.NetworkStackFixture)
def test_extra_dhcp_opts_logs_unsupported_options(self):
# initialize logs that match the pattern
topology.assert_ovn_unsupported_dhcp_option_messages()
wrong_ipv4_option = 'wrong-ipv4-option'
wrong_ipv6_option = 'bananas'
a_valid_ipv4_option_used_for_ipv6 = 'log-server'
extra_dhcp_opts = [
{'opt_value': '1.1.1.1',
'opt_name': a_valid_ipv4_option_used_for_ipv6,
'ip_version': IPV6},
{'opt_value': 'ipv6.domain',
'opt_name': 'domain-search',
'ip_version': IPV6},
{'opt_value': '1600',
'opt_name': 'mtu',
'ip_version': IPV4},
{'opt_value': 'blablabla',
'opt_name': wrong_ipv4_option,
'ip_version': IPV4}]
# create port with extra-dhcp-opts
port = neutron.create_port(**{'network_id': self.stack.network_id,
'extra_dhcp_opts': extra_dhcp_opts})
self.addCleanup(neutron.delete_port, port['id'])
# find new logs that match the pattern
invalid_options = [wrong_ipv4_option,
a_valid_ipv4_option_used_for_ipv6]
# assert every invalid dhcp option is logged
topology.assert_ovn_unsupported_dhcp_option_messages(
unsupported_options=invalid_options,
port_uuid=port['id'])
extra_dhcp_opts.append({'opt_value': '1.1.1.1',
'opt_name': wrong_ipv6_option,
'ip_version': IPV6})
# update port with new extra-dhcp-opts
port = neutron.update_port(port['id'],
**{'extra_dhcp_opts': extra_dhcp_opts})
invalid_options.append(wrong_ipv6_option)
# assert every invalid dhcp option is logged
topology.assert_ovn_unsupported_dhcp_option_messages(
unsupported_options=invalid_options,
port_uuid=port['id'])