From 6bf67e0a576bd573042742f9b4bfc464d544c837 Mon Sep 17 00:00:00 2001 From: Darragh O'Reilly Date: Mon, 3 Mar 2014 17:21:04 +0000 Subject: [PATCH] lb-agent: ensure removed devices get treated on resyncs The previous exception handling in daemon_loop reprocessed all existing devices as if they were new devices. However, it did not ensure that any removed devices got cleaned up properly. This could result in leftover security group iptables rules from deleted instances. This patch refactors daemon_loop and scan_devices so devices that were flagged for cleaning will get retreated in the next iteration if there is an exception. Change-Id: Ieada34ad315c0c29aa8462ebf041a448fde007b8 Closes-Bug: 1145610 --- .../agent/linuxbridge_neutron_agent.py | 84 +++++---- .../unit/linuxbridge/test_lb_neutron_agent.py | 174 +++++++++++------- 2 files changed, 154 insertions(+), 104 deletions(-) diff --git a/neutron/plugins/linuxbridge/agent/linuxbridge_neutron_agent.py b/neutron/plugins/linuxbridge/agent/linuxbridge_neutron_agent.py index fa60b19dd0c..f5a4507f6eb 100755 --- a/neutron/plugins/linuxbridge/agent/linuxbridge_neutron_agent.py +++ b/neutron/plugins/linuxbridge/agent/linuxbridge_neutron_agent.py @@ -934,15 +934,45 @@ class LinuxBridgeNeutronAgentRPC(sg_rpc.SecurityGroupAgentRpcMixin): self.br_mgr.remove_empty_bridges() return resync - def scan_devices(self, registered_devices, updated_devices): - curr_devices = self.br_mgr.get_tap_devices() + def scan_devices(self, previous, sync): device_info = {} - device_info['current'] = curr_devices - device_info['added'] = curr_devices - registered_devices - # we don't want to process updates for devices that don't exist - device_info['updated'] = updated_devices & curr_devices - # we need to clean up after devices are removed - device_info['removed'] = registered_devices - curr_devices + + # Save and reinitialise the set variable that the port_update RPC uses. + # This should be thread-safe as the greenthread should not yield + # between these two statements. + updated_devices = self.updated_devices + self.updated_devices = set() + + current_devices = self.br_mgr.get_tap_devices() + device_info['current'] = current_devices + + if previous is None: + # This is the first iteration of daemon_loop(). + previous = {'added': set(), + 'current': set(), + 'updated': set(), + 'removed': set()} + + if sync: + # This is the first iteration, or the previous one had a problem. + # Re-add all existing devices. + device_info['added'] = current_devices + + # Retry cleaning devices that may not have been cleaned properly. + # And clean any that disappeared since the previous iteration. + device_info['removed'] = (previous['removed'] | previous['current'] + - current_devices) + + # Retry updating devices that may not have been updated properly. + # And any that were updated since the previous iteration. + # Only update devices that currently exist. + device_info['updated'] = (previous['updated'] | updated_devices + & current_devices) + else: + device_info['added'] = current_devices - previous['current'] + device_info['removed'] = previous['current'] - current_devices + device_info['updated'] = updated_devices & current_devices + return device_info def _device_info_has_changes(self, device_info): @@ -951,39 +981,27 @@ class LinuxBridgeNeutronAgentRPC(sg_rpc.SecurityGroupAgentRpcMixin): or device_info.get('removed')) def daemon_loop(self): - sync = True - devices = set() - LOG.info(_("LinuxBridge Agent RPC Daemon Started!")) + device_info = None + sync = True while True: start = time.time() + + device_info = self.scan_devices(previous=device_info, sync=sync) + if sync: LOG.info(_("Agent out of sync with plugin!")) - devices.clear() sync = False - device_info = {} - # Save updated devices dict to perform rollback in case - # resync would be needed, and then clear self.updated_devices. - # As the greenthread should not yield between these - # two statements, this will should be thread-safe. - updated_devices_copy = self.updated_devices - self.updated_devices = set() - try: - device_info = self.scan_devices(devices, updated_devices_copy) - if self._device_info_has_changes(device_info): - LOG.debug(_("Agent loop found changes! %s"), device_info) - # If treat devices fails - indicates must resync with - # plugin + + if self._device_info_has_changes(device_info): + LOG.debug(_("Agent loop found changes! %s"), device_info) + try: sync = self.process_network_devices(device_info) - devices = device_info['current'] - except Exception: - LOG.exception(_("Error in agent loop. Devices info: %s"), - device_info) - sync = True - # Restore devices that were removed from this set earlier - # without overwriting ones that may have arrived since. - self.updated_devices |= updated_devices_copy + except Exception: + LOG.exception(_("Error in agent loop. Devices info: %s"), + device_info) + sync = True # sleep till end of polling interval elapsed = (time.time() - start) diff --git a/neutron/tests/unit/linuxbridge/test_lb_neutron_agent.py b/neutron/tests/unit/linuxbridge/test_lb_neutron_agent.py index a2a8b1785bd..0c4661267d8 100644 --- a/neutron/tests/unit/linuxbridge/test_lb_neutron_agent.py +++ b/neutron/tests/unit/linuxbridge/test_lb_neutron_agent.py @@ -17,7 +17,6 @@ import os import mock from oslo.config import cfg -import testtools from neutron.agent.linux import ip_lib from neutron.agent.linux import utils @@ -168,88 +167,121 @@ class TestLinuxBridgeAgent(base.BaseTestCase): self.assertTrue(fn_udd.called) self.assertTrue(fn_rdf.called) - def test_loop_restores_updated_devices_on_exception(self): - agent = self.agent - agent.updated_devices = set(['tap1', 'tap2']) - - with contextlib.nested( - mock.patch.object(agent, 'scan_devices'), - mock.patch.object(linuxbridge_neutron_agent.LOG, 'info'), - mock.patch.object(agent, 'process_network_devices') - ) as (scan_devices, log, process_network_devices): - # Simulate effect of 2 port_update()s when loop is running. - # And break out of loop at start of 2nd iteration. - log.side_effect = [agent.updated_devices.add('tap3'), - agent.updated_devices.add('tap4'), - ValueError] - scan_devices.side_effect = RuntimeError - - with testtools.ExpectedException(ValueError): - agent.daemon_loop() - - # Check that the originals {tap1,tap2} have been restored - # and the new updates {tap3, tap4} have not been overwritten. - self.assertEqual(set(['tap1', 'tap2', 'tap3', 'tap4']), - agent.updated_devices) - self.assertEqual(3, log.call_count) - - def mock_scan_devices(self, expected, mock_current, - registered_devices, updated_devices): + def _test_scan_devices(self, previous, updated, + fake_current, expected, sync): self.agent.br_mgr = mock.Mock() - self.agent.br_mgr.get_tap_devices.return_value = mock_current + self.agent.br_mgr.get_tap_devices.return_value = fake_current - results = self.agent.scan_devices(registered_devices, updated_devices) + self.agent.updated_devices = updated + results = self.agent.scan_devices(previous, sync) self.assertEqual(expected, results) - def test_scan_devices_returns_empty_sets(self): - registered = set() - updated = set() - mock_current = set() - expected = {'current': set(), - 'updated': set(), - 'added': set(), - 'removed': set()} - self.mock_scan_devices(expected, mock_current, registered, updated) - def test_scan_devices_no_changes(self): - registered = set(['tap1', 'tap2']) - updated = set() - mock_current = set(['tap1', 'tap2']) - expected = {'current': set(['tap1', 'tap2']), + previous = {'current': set([1, 2]), 'updated': set(), 'added': set(), 'removed': set()} - self.mock_scan_devices(expected, mock_current, registered, updated) - - def test_scan_devices_new_and_removed(self): - registered = set(['tap1', 'tap2']) + fake_current = set([1, 2]) updated = set() - mock_current = set(['tap2', 'tap3']) - expected = {'current': set(['tap2', 'tap3']), - 'updated': set(), - 'added': set(['tap3']), - 'removed': set(['tap1'])} - self.mock_scan_devices(expected, mock_current, registered, updated) - - def test_scan_devices_new_updates(self): - registered = set(['tap1']) - updated = set(['tap2']) - mock_current = set(['tap1', 'tap2']) - expected = {'current': set(['tap1', 'tap2']), - 'updated': set(['tap2']), - 'added': set(['tap2']), - 'removed': set()} - self.mock_scan_devices(expected, mock_current, registered, updated) - - def test_scan_devices_updated_missing(self): - registered = set(['tap1']) - updated = set(['tap2']) - mock_current = set(['tap1']) - expected = {'current': set(['tap1']), + expected = {'current': set([1, 2]), 'updated': set(), 'added': set(), 'removed': set()} - self.mock_scan_devices(expected, mock_current, registered, updated) + + self._test_scan_devices(previous, updated, fake_current, expected, + sync=False) + + def test_scan_devices_added_removed(self): + previous = {'current': set([1, 2]), + 'updated': set(), + 'added': set(), + 'removed': set()} + fake_current = set([2, 3]) + updated = set() + expected = {'current': set([2, 3]), + 'updated': set(), + 'added': set([3]), + 'removed': set([1])} + + self._test_scan_devices(previous, updated, fake_current, expected, + sync=False) + + def test_scan_devices_removed_retried_on_sync(self): + previous = {'current': set([2, 3]), + 'updated': set(), + 'added': set(), + 'removed': set([1])} + fake_current = set([2, 3]) + updated = set() + expected = {'current': set([2, 3]), + 'updated': set(), + 'added': set([2, 3]), + 'removed': set([1])} + + self._test_scan_devices(previous, updated, fake_current, expected, + sync=True) + + def test_scan_devices_vanished_removed_on_sync(self): + previous = {'current': set([2, 3]), + 'updated': set(), + 'added': set(), + 'removed': set([1])} + # Device 2 disappeared. + fake_current = set([3]) + updated = set() + # Device 1 should be retried. + expected = {'current': set([3]), + 'updated': set(), + 'added': set([3]), + 'removed': set([1, 2])} + + self._test_scan_devices(previous, updated, fake_current, expected, + sync=True) + + def test_scan_devices_updated(self): + previous = {'current': set([1, 2]), + 'updated': set(), + 'added': set(), + 'removed': set()} + fake_current = set([1, 2]) + updated = set([1]) + expected = {'current': set([1, 2]), + 'updated': set([1]), + 'added': set(), + 'removed': set()} + + self._test_scan_devices(previous, updated, fake_current, expected, + sync=False) + + def test_scan_devices_updated_non_existing(self): + previous = {'current': set([1, 2]), + 'updated': set(), + 'added': set(), + 'removed': set()} + fake_current = set([1, 2]) + updated = set([3]) + expected = {'current': set([1, 2]), + 'updated': set(), + 'added': set(), + 'removed': set()} + + self._test_scan_devices(previous, updated, fake_current, expected, + sync=False) + + def test_scan_devices_updated_on_sync(self): + previous = {'current': set([1, 2]), + 'updated': set([1]), + 'added': set(), + 'removed': set()} + fake_current = set([1, 2]) + updated = set([2]) + expected = {'current': set([1, 2]), + 'updated': set([1, 2]), + 'added': set([1, 2]), + 'removed': set()} + + self._test_scan_devices(previous, updated, fake_current, expected, + sync=True) def test_process_network_devices(self): agent = self.agent