From 725543684cbe0df0edc4b6924f85e63e1628fa92 Mon Sep 17 00:00:00 2001 From: rossella Date: Thu, 5 Mar 2015 09:24:10 +0000 Subject: [PATCH] Add get_events to OVSDB monitor OVSDB monitor can generate the events that the OVS agent needs to process (device added or updated). Instead of notifying only that a change occurred and that polling is needed, pass the events to the agent Change-Id: I3d17bf995ad4508c4c6d089de550148da1465fa1 Partially-Implements: blueprint restructure-l2-agent --- neutron/agent/linux/ovsdb_monitor.py | 46 ++++++++++++++--- neutron/agent/linux/polling.py | 3 ++ neutron/tests/common/net_helpers.py | 9 +++- .../agent/linux/test_ovsdb_monitor.py | 50 +++++++++++++++++-- .../unit/agent/linux/test_ovsdb_monitor.py | 12 +++-- 5 files changed, 106 insertions(+), 14 deletions(-) diff --git a/neutron/agent/linux/ovsdb_monitor.py b/neutron/agent/linux/ovsdb_monitor.py index 7e0ef251184..f992bca25b5 100644 --- a/neutron/agent/linux/ovsdb_monitor.py +++ b/neutron/agent/linux/ovsdb_monitor.py @@ -14,13 +14,19 @@ import eventlet from oslo_log import log as logging +from oslo_serialization import jsonutils from neutron.agent.linux import async_process +from neutron.agent.ovsdb import api as ovsdb from neutron.i18n import _LE LOG = logging.getLogger(__name__) +OVSDB_ACTION_INITIAL = 'initial' +OVSDB_ACTION_INSERT = 'insert' +OVSDB_ACTION_DELETE = 'delete' + class OvsdbMonitor(async_process.AsyncProcess): """Manages an invocation of 'ovsdb-client monitor'.""" @@ -63,22 +69,50 @@ class SimpleInterfaceMonitor(OvsdbMonitor): def __init__(self, respawn_interval=None): super(SimpleInterfaceMonitor, self).__init__( 'Interface', - columns=['name', 'ofport'], + columns=['name', 'ofport', 'external_ids'], format='json', respawn_interval=respawn_interval, ) self.data_received = False + self.new_events = {'added': [], 'removed': []} @property def has_updates(self): """Indicate whether the ovsdb Interface table has been updated. - True will be returned if the monitor process is not active. - This 'failing open' minimizes the risk of falsely indicating - the absence of updates at the expense of potential false - positives. + If the monitor process is not active an error will be logged since + it won't be able to communicate any update. This situation should be + temporary if respawn_interval is set. """ - return bool(list(self.iter_stdout())) or not self.is_active() + if not self.is_active(): + LOG.error(_LE("Interface monitor is not active")) + else: + self.process_events() + return bool(self.new_events['added'] or self.new_events['removed']) + + def get_events(self): + self.process_events() + events = self.new_events + self.new_events = {'added': [], 'removed': []} + return events + + def process_events(self): + devices_added = [] + devices_removed = [] + for row in self.iter_stdout(): + json = jsonutils.loads(row).get('data') + for ovs_id, action, name, ofport, external_ids in json: + if external_ids: + external_ids = ovsdb.val_to_py(external_ids) + device = {'name': name, + 'ofport': ofport, + 'external_ids': external_ids} + if action in (OVSDB_ACTION_INITIAL, OVSDB_ACTION_INSERT): + devices_added.append(device) + elif action == OVSDB_ACTION_DELETE: + devices_removed.append(device) + self.new_events['added'].extend(devices_added) + self.new_events['removed'].extend(devices_removed) def start(self, block=False, timeout=5): super(SimpleInterfaceMonitor, self).start() diff --git a/neutron/agent/linux/polling.py b/neutron/agent/linux/polling.py index dffabf34030..ac3a4a620c2 100644 --- a/neutron/agent/linux/polling.py +++ b/neutron/agent/linux/polling.py @@ -60,3 +60,6 @@ class InterfacePollingMinimizer(base_polling.BasePollingManager): # collect output. eventlet.sleep() return self._monitor.has_updates + + def get_events(self): + return self._monitor.get_events() diff --git a/neutron/tests/common/net_helpers.py b/neutron/tests/common/net_helpers.py index 5d665f7f9ce..ae494f5f358 100644 --- a/neutron/tests/common/net_helpers.py +++ b/neutron/tests/common/net_helpers.py @@ -181,6 +181,12 @@ class OVSBridgeFixture(fixtures.Fixture): class OVSPortFixture(PortFixture): + def __init__(self, bridge=None, namespace=None, attrs=None): + super(OVSPortFixture, self).__init__(bridge, namespace) + if attrs is None: + attrs = [] + self.attrs = attrs + def _create_bridge_fixture(self): return OVSBridgeFixture() @@ -196,7 +202,8 @@ class OVSPortFixture(PortFixture): self.port.link.set_up() def create_port(self, name): - self.bridge.add_port(name, ('type', 'internal')) + self.attrs.insert(0, ('type', 'internal')) + self.bridge.add_port(name, *self.attrs) return name diff --git a/neutron/tests/functional/agent/linux/test_ovsdb_monitor.py b/neutron/tests/functional/agent/linux/test_ovsdb_monitor.py index a9ae8c2365e..fc49b1ae4d1 100644 --- a/neutron/tests/functional/agent/linux/test_ovsdb_monitor.py +++ b/neutron/tests/functional/agent/linux/test_ovsdb_monitor.py @@ -107,9 +107,51 @@ class TestSimpleInterfaceMonitor(BaseMonitorTest): utils.wait_until_true(lambda: self.monitor.data_received is True) self.assertTrue(self.monitor.has_updates, 'Initial call should always be true') - self.assertFalse(self.monitor.has_updates, - 'has_updates without port addition should be False') + # clear the event list + self.monitor.get_events() self.useFixture(net_helpers.OVSPortFixture()) # has_updates after port addition should become True - while not self.monitor.has_updates: - eventlet.sleep(0.01) + utils.wait_until_true(lambda: self.monitor.has_updates is True) + + def _expected_devices_events(self, devices, state): + """Helper to check that events are received for expected devices. + + :param devices: The list of expected devices. WARNING: This list + is modified by this method + :param state: The state of the devices (added or removed) + """ + events = self.monitor.get_events() + event_devices = [ + (dev['name'], dev['external_ids']) for dev in events.get(state)] + for dev in event_devices: + if dev[0] in devices: + devices.remove(dev[0]) + self.assertEqual(dev[1].get('iface-status'), 'active') + if not devices: + return True + + def test_get_events(self): + utils.wait_until_true(lambda: self.monitor.data_received is True) + devices = self.monitor.get_events() + self.assertTrue(devices.get('added'), + 'Initial call should always be true') + p_attrs = [('external_ids', {'iface-status': 'active'})] + br = self.useFixture(net_helpers.OVSBridgeFixture()) + p1 = self.useFixture(net_helpers.OVSPortFixture( + br.bridge, None, p_attrs)) + p2 = self.useFixture(net_helpers.OVSPortFixture( + br.bridge, None, p_attrs)) + added_devices = [p1.port.name, p2.port.name] + utils.wait_until_true( + lambda: self._expected_devices_events(added_devices, 'added')) + br.bridge.delete_port(p1.port.name) + br.bridge.delete_port(p2.port.name) + removed_devices = [p1.port.name, p2.port.name] + utils.wait_until_true( + lambda: self._expected_devices_events(removed_devices, 'removed')) + # restart + self.monitor.stop(block=True) + self.monitor.start(block=True, timeout=60) + devices = self.monitor.get_events() + self.assertTrue(devices.get('added'), + 'Initial call should always be true') diff --git a/neutron/tests/unit/agent/linux/test_ovsdb_monitor.py b/neutron/tests/unit/agent/linux/test_ovsdb_monitor.py index 9b8b9768706..604d6cc4ad8 100644 --- a/neutron/tests/unit/agent/linux/test_ovsdb_monitor.py +++ b/neutron/tests/unit/agent/linux/test_ovsdb_monitor.py @@ -55,9 +55,6 @@ class TestSimpleInterfaceMonitor(base.BaseTestCase): super(TestSimpleInterfaceMonitor, self).setUp() self.monitor = ovsdb_monitor.SimpleInterfaceMonitor() - def test_has_updates_is_true_by_default(self): - self.assertTrue(self.monitor.has_updates) - def test_has_updates_is_false_if_active_with_no_output(self): target = ('neutron.agent.linux.ovsdb_monitor.SimpleInterfaceMonitor' '.is_active') @@ -87,3 +84,12 @@ class TestSimpleInterfaceMonitor(base.BaseTestCase): return_value=output): self.monitor._read_stdout() self.assertFalse(self.monitor.data_received) + + def test_has_updates_after_calling_get_events_is_false(self): + with mock.patch.object( + self.monitor, 'process_events') as process_events: + self.monitor.new_events = {'added': ['foo'], 'removed': ['foo1']} + self.assertTrue(self.monitor.has_updates) + self.monitor.get_events() + self.assertTrue(process_events.called) + self.assertFalse(self.monitor.has_updates)