Merge "SimpleInterfaceMonitor filter events by bridge name"
This commit is contained in:
commit
5c54553224
|
@ -12,6 +12,8 @@
|
|||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import collections
|
||||
|
||||
from oslo_log import log as logging
|
||||
from oslo_serialization import jsonutils
|
||||
|
||||
|
@ -74,7 +76,10 @@ class SimpleInterfaceMonitor(OvsdbMonitor):
|
|||
since the previous access.
|
||||
"""
|
||||
|
||||
def __init__(self, respawn_interval=None, ovsdb_connection=None):
|
||||
def __init__(self, respawn_interval=None, ovsdb_connection=None,
|
||||
bridge_names=None, ovs=None):
|
||||
self._bridge_names = bridge_names or []
|
||||
self._ovs = ovs
|
||||
super(SimpleInterfaceMonitor, self).__init__(
|
||||
'Interface',
|
||||
columns=['name', 'ofport', 'external_ids'],
|
||||
|
@ -82,6 +87,12 @@ class SimpleInterfaceMonitor(OvsdbMonitor):
|
|||
respawn_interval=respawn_interval,
|
||||
ovsdb_connection=ovsdb_connection
|
||||
)
|
||||
if self._bridge_names and self._ovs:
|
||||
LOG.warning(
|
||||
'Interface monitor is filtering events only for interfaces of '
|
||||
'ports belonging these bridges: %s. This filtering has a '
|
||||
'negative impact on the performance and is not needed in '
|
||||
'production environment!', self._bridge_names)
|
||||
|
||||
@property
|
||||
def has_updates(self):
|
||||
|
@ -133,3 +144,28 @@ class SimpleInterfaceMonitor(OvsdbMonitor):
|
|||
# update any events with ofports received from 'new' action
|
||||
for event in self.new_events['added']:
|
||||
event['ofport'] = dev_to_ofport.get(event['name'], event['ofport'])
|
||||
|
||||
self.new_events = self._filter_events(self.new_events)
|
||||
|
||||
def _filter_events(self, events):
|
||||
if not (self._bridge_names and self._ovs):
|
||||
return events
|
||||
|
||||
port_to_bridge = {}
|
||||
events_filtered = collections.defaultdict(list)
|
||||
for device in events['added']:
|
||||
bridge_name = self._ovs.get_bridge_for_iface(device['name'])
|
||||
if bridge_name in self._bridge_names:
|
||||
port_to_bridge[device['name']] = bridge_name
|
||||
events_filtered['added'].append(device)
|
||||
|
||||
for (etype, devs) in ((etype, devs) for (etype, devs) in events.items()
|
||||
if etype in ('removed', 'modified')):
|
||||
for device in devs:
|
||||
bridge_name = port_to_bridge.get(device['name'])
|
||||
if etype == 'removed':
|
||||
port_to_bridge.pop(device['name'], None)
|
||||
if bridge_name in self._bridge_names:
|
||||
events_filtered[etype].append(device)
|
||||
|
||||
return events_filtered
|
||||
|
|
|
@ -30,10 +30,12 @@ LOG = logging.getLogger(__name__)
|
|||
@contextlib.contextmanager
|
||||
def get_polling_manager(minimize_polling=False,
|
||||
ovsdb_monitor_respawn_interval=(
|
||||
constants.DEFAULT_OVSDBMON_RESPAWN)):
|
||||
constants.DEFAULT_OVSDBMON_RESPAWN),
|
||||
bridge_names=None, ovs=None):
|
||||
if minimize_polling:
|
||||
pm = InterfacePollingMinimizer(
|
||||
ovsdb_monitor_respawn_interval=ovsdb_monitor_respawn_interval)
|
||||
ovsdb_monitor_respawn_interval=ovsdb_monitor_respawn_interval,
|
||||
bridge_names=bridge_names, ovs=ovs)
|
||||
pm.start()
|
||||
else:
|
||||
pm = base_polling.AlwaysPoll()
|
||||
|
@ -49,12 +51,14 @@ class InterfacePollingMinimizer(base_polling.BasePollingManager):
|
|||
|
||||
def __init__(
|
||||
self,
|
||||
ovsdb_monitor_respawn_interval=constants.DEFAULT_OVSDBMON_RESPAWN):
|
||||
ovsdb_monitor_respawn_interval=constants.DEFAULT_OVSDBMON_RESPAWN,
|
||||
bridge_names=None, ovs=None):
|
||||
|
||||
super(InterfacePollingMinimizer, self).__init__()
|
||||
self._monitor = ovsdb_monitor.SimpleInterfaceMonitor(
|
||||
respawn_interval=ovsdb_monitor_respawn_interval,
|
||||
ovsdb_connection=cfg.CONF.OVS.ovsdb_connection)
|
||||
ovsdb_connection=cfg.CONF.OVS.ovsdb_connection,
|
||||
bridge_names=bridge_names, ovs=ovs)
|
||||
|
||||
def start(self):
|
||||
self._monitor.start(block=True)
|
||||
|
@ -73,3 +77,16 @@ class InterfacePollingMinimizer(base_polling.BasePollingManager):
|
|||
|
||||
def get_events(self):
|
||||
return self._monitor.get_events()
|
||||
|
||||
|
||||
def filter_bridge_names(br_names):
|
||||
"""Bridge names to filter events received in the Interface monitor
|
||||
|
||||
This method is used only in fullstack testing. Because several OVS agents
|
||||
are executed in the same host and share the same OVS switch, this filtering
|
||||
will remove events of other agents; the Interface monitor will only return
|
||||
events of Interfaces attached to Ports that belong to bridges "br_names".
|
||||
|
||||
If the list is empty, no filtering is done.
|
||||
"""
|
||||
return []
|
||||
|
|
|
@ -2697,9 +2697,12 @@ class OVSNeutronAgent(l2population_rpc.L2populationRpcCallBackTunnelMixin,
|
|||
br_names = [br.br_name for br in self.phys_brs.values()]
|
||||
|
||||
self.ovs.ovsdb.idl_monitor.start_bridge_monitor(br_names)
|
||||
bridge_names = polling.filter_bridge_names([self.int_br.br_name])
|
||||
with polling.get_polling_manager(
|
||||
self.minimize_polling,
|
||||
self.ovsdb_monitor_respawn_interval) as pm:
|
||||
self.ovsdb_monitor_respawn_interval,
|
||||
bridge_names=bridge_names,
|
||||
ovs=self.ovs) as pm:
|
||||
self.rpc_loop(polling_manager=pm)
|
||||
|
||||
def _handle_sigterm(self, signum, frame):
|
||||
|
|
|
@ -19,6 +19,7 @@ from unittest import mock
|
|||
from oslo_config import cfg
|
||||
|
||||
from neutron.agent.common import ovs_lib
|
||||
from neutron.agent.common import polling
|
||||
from neutron.agent.l2.extensions import qos as qos_extension
|
||||
from neutron.services.trunk.drivers.openvswitch.agent \
|
||||
import driver as trunk_driver
|
||||
|
@ -46,6 +47,15 @@ def monkeypatch_qos():
|
|||
'_process_reset_port').start()
|
||||
|
||||
|
||||
def monkeypatch_event_filtering():
|
||||
def filter_bridge_names(br_names):
|
||||
if 'trunk' in cfg.CONF.service_plugins:
|
||||
return []
|
||||
return br_names
|
||||
|
||||
polling.filter_bridge_names = filter_bridge_names
|
||||
|
||||
|
||||
def main():
|
||||
# TODO(slaweq): this monkepatch will not be necessary when
|
||||
# https://review.opendev.org/#/c/506722/ will be merged and ovsdb-server
|
||||
|
@ -53,6 +63,7 @@ def main():
|
|||
# namespace
|
||||
monkeypatch_init_handler()
|
||||
monkeypatch_qos()
|
||||
monkeypatch_event_filtering()
|
||||
ovs_agent.main()
|
||||
|
||||
|
||||
|
|
|
@ -0,0 +1,82 @@
|
|||
# Copyright (c) 2020 Red Hat, Inc.
|
||||
# All Rights Reserved.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
from oslo_config import cfg
|
||||
|
||||
from neutron.agent.common import ovsdb_monitor
|
||||
from neutron.common import utils as common_utils
|
||||
from neutron.tests.common import net_helpers
|
||||
from neutron.tests.functional import base
|
||||
|
||||
|
||||
class SimpleInterfaceMonitorTestCase(base.BaseSudoTestCase):
|
||||
|
||||
def _stop_monitors(self, monitors):
|
||||
for monitor in monitors:
|
||||
monitor.stop()
|
||||
|
||||
def _check_port_events(self, monitor, ports_expected=None,
|
||||
ports_not_expected=None):
|
||||
ports_expected = ports_expected or set([])
|
||||
ports_not_expected = ports_not_expected or set([])
|
||||
added_events = monitor.get_events().get('added', [])
|
||||
added_port_names = {port['name'] for port in added_events}
|
||||
intersection = ports_not_expected & added_port_names
|
||||
if intersection:
|
||||
self.fail('Interface monitor filtering events for bridges %s '
|
||||
'received an event for those ports %s' %
|
||||
(monitor._bridge_names, intersection))
|
||||
return ports_expected - added_port_names
|
||||
|
||||
def test_interface_monitor_filtering(self):
|
||||
br_1 = self.useFixture(net_helpers.OVSBridgeFixture()).bridge
|
||||
br_2 = self.useFixture(net_helpers.OVSBridgeFixture()).bridge
|
||||
|
||||
mon_no_filter = ovsdb_monitor.SimpleInterfaceMonitor(
|
||||
respawn_interval=30,
|
||||
ovsdb_connection=cfg.CONF.OVS.ovsdb_connection)
|
||||
mon_no_filter.start(block=True)
|
||||
mon_br_1 = ovsdb_monitor.SimpleInterfaceMonitor(
|
||||
respawn_interval=30,
|
||||
ovsdb_connection=cfg.CONF.OVS.ovsdb_connection,
|
||||
bridge_names=[br_1.br_name], ovs=br_1)
|
||||
mon_br_1.start(block=True)
|
||||
mon_br_2 = ovsdb_monitor.SimpleInterfaceMonitor(
|
||||
respawn_interval=30,
|
||||
ovsdb_connection=cfg.CONF.OVS.ovsdb_connection,
|
||||
bridge_names=[br_2.br_name], ovs=br_1)
|
||||
mon_br_2.start(block=True)
|
||||
self.addCleanup(self._stop_monitors,
|
||||
[mon_no_filter, mon_br_1, mon_br_2])
|
||||
|
||||
p1 = self.useFixture(net_helpers.OVSPortFixture(br_1))
|
||||
p2 = self.useFixture(net_helpers.OVSPortFixture(br_2))
|
||||
|
||||
ports_expected = {p1.port.name, p2.port.name}
|
||||
try:
|
||||
common_utils.wait_until_true(
|
||||
lambda: not self._check_port_events(
|
||||
mon_no_filter, ports_expected=ports_expected),
|
||||
timeout=5)
|
||||
except common_utils.WaitTimeout:
|
||||
self.fail('Interface monitor not filtered did not received an '
|
||||
'event for ports %s' % ports_expected)
|
||||
|
||||
self.assertIs(0, len(self._check_port_events(
|
||||
mon_br_1, ports_expected={p1.port.name},
|
||||
ports_not_expected={p2.port.name})))
|
||||
self.assertIs(0, len(self._check_port_events(
|
||||
mon_br_2, ports_expected={p2.port.name},
|
||||
ports_not_expected={p1.port.name})))
|
|
@ -1995,8 +1995,9 @@ class TestOvsNeutronAgent(object):
|
|||
mock.patch.object(self.agent.ovs.ovsdb, 'idl_monitor') as \
|
||||
mock_idl_monitor:
|
||||
self.agent.daemon_loop()
|
||||
mock_get_pm.assert_called_with(True,
|
||||
constants.DEFAULT_OVSDBMON_RESPAWN)
|
||||
mock_get_pm.assert_called_with(
|
||||
True, constants.DEFAULT_OVSDBMON_RESPAWN, bridge_names=[],
|
||||
ovs=self.agent.ovs)
|
||||
mock_loop.assert_called_once_with(polling_manager=mock.ANY)
|
||||
mock_idl_monitor.start_bridge_monitor.assert_called()
|
||||
|
||||
|
|
Loading…
Reference in New Issue