Merge "Exclude detection of leaked OVS resources on undercloud node"

This commit is contained in:
Zuul 2021-09-09 21:06:05 +00:00 committed by Gerrit Code Review
commit 6a3dc304f9
1 changed files with 45 additions and 30 deletions

View File

@ -1,7 +1,9 @@
from __future__ import absolute_import
import collections
import json
import re
import typing
from oslo_log import log
@ -255,35 +257,48 @@ def test_ovs_bridges_mac_table_size():
expected_mac_table_size)
def test_ovs_namespaces_are_absent():
ovs_specific_namespaces = ['qrouter', 'qdhcp', 'snat', 'fip']
for node in topology.list_openstack_nodes():
if node.name.startswith('undercloud'):
continue
namespaces = ip.list_network_namespaces(
ssh_client=node.ssh_client, sudo=True)
namespaces = [namespace
for namespace in namespaces
if any(namespace.startswith(prefix)
for prefix in ovs_specific_namespaces)]
test_case = tobiko.get_test_case()
test_case.assertEqual(
[], namespaces,
f"Unexpected namespace found on {node.name}: {*namespaces,}")
OPENSTACK_NODE_GROUP = re.compile(r'(compute|controller|overcloud)')
OVS_NAMESPACE = re.compile(r'(qrouter.*|qdhcp.*|snat.*|fip.*)')
def test_ovs_interfaces_are_absent():
ovs_specific_interfaces = ['qvo', 'qvb', 'qbr', 'qr', 'qg', 'fg', 'sg']
for node in topology.list_openstack_nodes():
if node.name.startswith('undercloud'):
continue
interfaces = ip.list_network_interfaces(
ssh_client=node.ssh_client, sudo=True)
interfaces = [interface
for interface in interfaces
if any(interface.startswith(prefix)
for prefix in ovs_specific_interfaces)]
test_case = tobiko.get_test_case()
test_case.assertEqual(
[], interfaces,
f"Unexpected interface found on {node.name}: {*interfaces,}")
def test_ovs_namespaces_are_absent(
group: typing.Pattern[str] = OPENSTACK_NODE_GROUP,
namespace: typing.Pattern[str] = OVS_NAMESPACE):
nodes = topology.list_openstack_nodes(group=group)
namespaces: typing.Dict[str, typing.List[str]] = (
collections.defaultdict(list))
for node in nodes:
for node_namespace in ip.list_network_namespaces(
ssh_client=node.ssh_client, sudo=True):
if namespace.match(node_namespace):
namespaces[node.name].append(node_namespace)
namespaces = dict(namespaces)
test_case = tobiko.get_test_case()
test_case.assertEqual(
{}, dict(namespaces),
f"OVS namespace(s) found on OpenStack nodes: {namespaces}")
OVS_INTERFACE = re.compile(r'(qvo.*|qvb.*|qbr.*|qr.*|qg.*|fg.*|sg.*)')
def test_ovs_interfaces_are_absent(
group: typing.Pattern[str] = OPENSTACK_NODE_GROUP,
interface: typing.Pattern[str] = OVS_INTERFACE):
nodes = topology.list_openstack_nodes(group=group)
interfaces: typing.Dict[str, typing.List[str]] = (
collections.defaultdict(list))
for node in nodes:
for node_interface in ip.list_network_interfaces(
ssh_client=node.ssh_client, sudo=True):
if interface.match(node_interface):
interfaces[node.name].append(node_interface)
interfaces = dict(interfaces)
test_case = tobiko.get_test_case()
test_case.assertEqual(
{}, interfaces,
f"OVS interface(s) found on OpenStack nodes: {interfaces}")