SimpleInterfaceMonitor filter events by bridge name
Add a filter in SimpleInterfaceMonitor. If provided, the Interface events received will be filtered by port. For each Interface event received, the bridge name will be retrieved and compared with the list of bridge names given. If no names are provided, the monitor won't filter any event (current behavior). This filter is meant to be used in Fullstack test only. This filtering add an extra overload (two OVS DB queries per event) that should not be needed in a production environment. Closes-Bug: #1885547 Change-Id: Ie1fc8cf7d29c71eb358e593726b446787d8022c2
This commit is contained in:
parent
b70a6e97dc
commit
dcf7acf2c6
|
@ -12,6 +12,8 @@
|
|||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import collections
|
||||
|
||||
from oslo_log import log as logging
|
||||
from oslo_serialization import jsonutils
|
||||
|
||||
|
@ -74,7 +76,10 @@ class SimpleInterfaceMonitor(OvsdbMonitor):
|
|||
since the previous access.
|
||||
"""
|
||||
|
||||
def __init__(self, respawn_interval=None, ovsdb_connection=None):
|
||||
def __init__(self, respawn_interval=None, ovsdb_connection=None,
|
||||
bridge_names=None, ovs=None):
|
||||
self._bridge_names = bridge_names or []
|
||||
self._ovs = ovs
|
||||
super(SimpleInterfaceMonitor, self).__init__(
|
||||
'Interface',
|
||||
columns=['name', 'ofport', 'external_ids'],
|
||||
|
@ -82,6 +87,12 @@ class SimpleInterfaceMonitor(OvsdbMonitor):
|
|||
respawn_interval=respawn_interval,
|
||||
ovsdb_connection=ovsdb_connection
|
||||
)
|
||||
if self._bridge_names and self._ovs:
|
||||
LOG.warning(
|
||||
'Interface monitor is filtering events only for interfaces of '
|
||||
'ports belonging these bridges: %s. This filtering has a '
|
||||
'negative impact on the performance and is not needed in '
|
||||
'production environment!', self._bridge_names)
|
||||
|
||||
@property
|
||||
def has_updates(self):
|
||||
|
@ -133,3 +144,28 @@ class SimpleInterfaceMonitor(OvsdbMonitor):
|
|||
# update any events with ofports received from 'new' action
|
||||
for event in self.new_events['added']:
|
||||
event['ofport'] = dev_to_ofport.get(event['name'], event['ofport'])
|
||||
|
||||
self.new_events = self._filter_events(self.new_events)
|
||||
|
||||
def _filter_events(self, events):
|
||||
if not (self._bridge_names and self._ovs):
|
||||
return events
|
||||
|
||||
port_to_bridge = {}
|
||||
events_filtered = collections.defaultdict(list)
|
||||
for device in events['added']:
|
||||
bridge_name = self._ovs.get_bridge_for_iface(device['name'])
|
||||
if bridge_name in self._bridge_names:
|
||||
port_to_bridge[device['name']] = bridge_name
|
||||
events_filtered['added'].append(device)
|
||||
|
||||
for (etype, devs) in ((etype, devs) for (etype, devs) in events.items()
|
||||
if etype in ('removed', 'modified')):
|
||||
for device in devs:
|
||||
bridge_name = port_to_bridge.get(device['name'])
|
||||
if etype == 'removed':
|
||||
port_to_bridge.pop(device['name'], None)
|
||||
if bridge_name in self._bridge_names:
|
||||
events_filtered[etype].append(device)
|
||||
|
||||
return events_filtered
|
||||
|
|
|
@ -30,10 +30,12 @@ LOG = logging.getLogger(__name__)
|
|||
@contextlib.contextmanager
|
||||
def get_polling_manager(minimize_polling=False,
|
||||
ovsdb_monitor_respawn_interval=(
|
||||
constants.DEFAULT_OVSDBMON_RESPAWN)):
|
||||
constants.DEFAULT_OVSDBMON_RESPAWN),
|
||||
bridge_names=None, ovs=None):
|
||||
if minimize_polling:
|
||||
pm = InterfacePollingMinimizer(
|
||||
ovsdb_monitor_respawn_interval=ovsdb_monitor_respawn_interval)
|
||||
ovsdb_monitor_respawn_interval=ovsdb_monitor_respawn_interval,
|
||||
bridge_names=bridge_names, ovs=ovs)
|
||||
pm.start()
|
||||
else:
|
||||
pm = base_polling.AlwaysPoll()
|
||||
|
@ -49,12 +51,14 @@ class InterfacePollingMinimizer(base_polling.BasePollingManager):
|
|||
|
||||
def __init__(
|
||||
self,
|
||||
ovsdb_monitor_respawn_interval=constants.DEFAULT_OVSDBMON_RESPAWN):
|
||||
ovsdb_monitor_respawn_interval=constants.DEFAULT_OVSDBMON_RESPAWN,
|
||||
bridge_names=None, ovs=None):
|
||||
|
||||
super(InterfacePollingMinimizer, self).__init__()
|
||||
self._monitor = ovsdb_monitor.SimpleInterfaceMonitor(
|
||||
respawn_interval=ovsdb_monitor_respawn_interval,
|
||||
ovsdb_connection=cfg.CONF.OVS.ovsdb_connection)
|
||||
ovsdb_connection=cfg.CONF.OVS.ovsdb_connection,
|
||||
bridge_names=bridge_names, ovs=ovs)
|
||||
|
||||
def start(self):
|
||||
self._monitor.start(block=True)
|
||||
|
@ -73,3 +77,16 @@ class InterfacePollingMinimizer(base_polling.BasePollingManager):
|
|||
|
||||
def get_events(self):
|
||||
return self._monitor.get_events()
|
||||
|
||||
|
||||
def filter_bridge_names(br_names):
|
||||
"""Bridge names to filter events received in the Interface monitor
|
||||
|
||||
This method is used only in fullstack testing. Because several OVS agents
|
||||
are executed in the same host and share the same OVS switch, this filtering
|
||||
will remove events of other agents; the Interface monitor will only return
|
||||
events of Interfaces attached to Ports that belong to bridges "br_names".
|
||||
|
||||
If the list is empty, no filtering is done.
|
||||
"""
|
||||
return []
|
||||
|
|
|
@ -2726,9 +2726,12 @@ class OVSNeutronAgent(l2population_rpc.L2populationRpcCallBackTunnelMixin,
|
|||
br_names = [br.br_name for br in self.phys_brs.values()]
|
||||
|
||||
self.ovs.ovsdb.idl_monitor.start_bridge_monitor(br_names)
|
||||
bridge_names = polling.filter_bridge_names([self.int_br.br_name])
|
||||
with polling.get_polling_manager(
|
||||
self.minimize_polling,
|
||||
self.ovsdb_monitor_respawn_interval) as pm:
|
||||
self.ovsdb_monitor_respawn_interval,
|
||||
bridge_names=bridge_names,
|
||||
ovs=self.ovs) as pm:
|
||||
self.rpc_loop(polling_manager=pm)
|
||||
|
||||
def _handle_sigterm(self, signum, frame):
|
||||
|
|
|
@ -19,6 +19,7 @@ from unittest import mock
|
|||
from oslo_config import cfg
|
||||
|
||||
from neutron.agent.common import ovs_lib
|
||||
from neutron.agent.common import polling
|
||||
from neutron.agent.l2.extensions import qos as qos_extension
|
||||
from neutron.services.trunk.drivers.openvswitch.agent \
|
||||
import driver as trunk_driver
|
||||
|
@ -46,6 +47,15 @@ def monkeypatch_qos():
|
|||
'_process_reset_port').start()
|
||||
|
||||
|
||||
def monkeypatch_event_filtering():
|
||||
def filter_bridge_names(br_names):
|
||||
if 'trunk' in cfg.CONF.service_plugins:
|
||||
return []
|
||||
return br_names
|
||||
|
||||
polling.filter_bridge_names = filter_bridge_names
|
||||
|
||||
|
||||
def main():
|
||||
# TODO(slaweq): this monkepatch will not be necessary when
|
||||
# https://review.opendev.org/#/c/506722/ will be merged and ovsdb-server
|
||||
|
@ -53,6 +63,7 @@ def main():
|
|||
# namespace
|
||||
monkeypatch_init_handler()
|
||||
monkeypatch_qos()
|
||||
monkeypatch_event_filtering()
|
||||
ovs_agent.main()
|
||||
|
||||
|
||||
|
|
|
@ -0,0 +1,82 @@
|
|||
# Copyright (c) 2020 Red Hat, Inc.
|
||||
# All Rights Reserved.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
from oslo_config import cfg
|
||||
|
||||
from neutron.agent.common import ovsdb_monitor
|
||||
from neutron.common import utils as common_utils
|
||||
from neutron.tests.common import net_helpers
|
||||
from neutron.tests.functional import base
|
||||
|
||||
|
||||
class SimpleInterfaceMonitorTestCase(base.BaseSudoTestCase):
|
||||
|
||||
def _stop_monitors(self, monitors):
|
||||
for monitor in monitors:
|
||||
monitor.stop()
|
||||
|
||||
def _check_port_events(self, monitor, ports_expected=None,
|
||||
ports_not_expected=None):
|
||||
ports_expected = ports_expected or set([])
|
||||
ports_not_expected = ports_not_expected or set([])
|
||||
added_events = monitor.get_events().get('added', [])
|
||||
added_port_names = {port['name'] for port in added_events}
|
||||
intersection = ports_not_expected & added_port_names
|
||||
if intersection:
|
||||
self.fail('Interface monitor filtering events for bridges %s '
|
||||
'received an event for those ports %s' %
|
||||
(monitor._bridge_names, intersection))
|
||||
return ports_expected - added_port_names
|
||||
|
||||
def test_interface_monitor_filtering(self):
|
||||
br_1 = self.useFixture(net_helpers.OVSBridgeFixture()).bridge
|
||||
br_2 = self.useFixture(net_helpers.OVSBridgeFixture()).bridge
|
||||
|
||||
mon_no_filter = ovsdb_monitor.SimpleInterfaceMonitor(
|
||||
respawn_interval=30,
|
||||
ovsdb_connection=cfg.CONF.OVS.ovsdb_connection)
|
||||
mon_no_filter.start(block=True)
|
||||
mon_br_1 = ovsdb_monitor.SimpleInterfaceMonitor(
|
||||
respawn_interval=30,
|
||||
ovsdb_connection=cfg.CONF.OVS.ovsdb_connection,
|
||||
bridge_names=[br_1.br_name], ovs=br_1)
|
||||
mon_br_1.start(block=True)
|
||||
mon_br_2 = ovsdb_monitor.SimpleInterfaceMonitor(
|
||||
respawn_interval=30,
|
||||
ovsdb_connection=cfg.CONF.OVS.ovsdb_connection,
|
||||
bridge_names=[br_2.br_name], ovs=br_1)
|
||||
mon_br_2.start(block=True)
|
||||
self.addCleanup(self._stop_monitors,
|
||||
[mon_no_filter, mon_br_1, mon_br_2])
|
||||
|
||||
p1 = self.useFixture(net_helpers.OVSPortFixture(br_1))
|
||||
p2 = self.useFixture(net_helpers.OVSPortFixture(br_2))
|
||||
|
||||
ports_expected = {p1.port.name, p2.port.name}
|
||||
try:
|
||||
common_utils.wait_until_true(
|
||||
lambda: not self._check_port_events(
|
||||
mon_no_filter, ports_expected=ports_expected),
|
||||
timeout=5)
|
||||
except common_utils.WaitTimeout:
|
||||
self.fail('Interface monitor not filtered did not received an '
|
||||
'event for ports %s' % ports_expected)
|
||||
|
||||
self.assertIs(0, len(self._check_port_events(
|
||||
mon_br_1, ports_expected={p1.port.name},
|
||||
ports_not_expected={p2.port.name})))
|
||||
self.assertIs(0, len(self._check_port_events(
|
||||
mon_br_2, ports_expected={p2.port.name},
|
||||
ports_not_expected={p1.port.name})))
|
|
@ -2024,8 +2024,9 @@ class TestOvsNeutronAgent(object):
|
|||
mock.patch.object(self.agent.ovs.ovsdb, 'idl_monitor') as \
|
||||
mock_idl_monitor:
|
||||
self.agent.daemon_loop()
|
||||
mock_get_pm.assert_called_with(True,
|
||||
constants.DEFAULT_OVSDBMON_RESPAWN)
|
||||
mock_get_pm.assert_called_with(
|
||||
True, constants.DEFAULT_OVSDBMON_RESPAWN, bridge_names=[],
|
||||
ovs=self.agent.ovs)
|
||||
mock_loop.assert_called_once_with(polling_manager=mock.ANY)
|
||||
mock_idl_monitor.start_bridge_monitor.assert_called()
|
||||
|
||||
|
|
Loading…
Reference in New Issue