lb-agent: ensure removed devices get treated on resyncs

The previous exception handling in daemon_loop reprocessed all existing
devices as if they were new devices. However, it did not ensure that
any removed devices got cleaned up properly. This could result in
leftover security group iptables rules from deleted instances.

This patch refactors daemon_loop and scan_devices so devices that were
flagged for cleaning will get retreated in the next iteration if there
is an exception.

Change-Id: Ieada34ad315c0c29aa8462ebf041a448fde007b8
Closes-Bug: 1145610
This commit is contained in:
Darragh O'Reilly 2014-03-03 17:21:04 +00:00
parent fe57b966ac
commit 6bf67e0a57
2 changed files with 154 additions and 104 deletions

View File

@ -934,15 +934,45 @@ class LinuxBridgeNeutronAgentRPC(sg_rpc.SecurityGroupAgentRpcMixin):
self.br_mgr.remove_empty_bridges()
return resync
def scan_devices(self, registered_devices, updated_devices):
curr_devices = self.br_mgr.get_tap_devices()
def scan_devices(self, previous, sync):
device_info = {}
device_info['current'] = curr_devices
device_info['added'] = curr_devices - registered_devices
# we don't want to process updates for devices that don't exist
device_info['updated'] = updated_devices & curr_devices
# we need to clean up after devices are removed
device_info['removed'] = registered_devices - curr_devices
# Save and reinitialise the set variable that the port_update RPC uses.
# This should be thread-safe as the greenthread should not yield
# between these two statements.
updated_devices = self.updated_devices
self.updated_devices = set()
current_devices = self.br_mgr.get_tap_devices()
device_info['current'] = current_devices
if previous is None:
# This is the first iteration of daemon_loop().
previous = {'added': set(),
'current': set(),
'updated': set(),
'removed': set()}
if sync:
# This is the first iteration, or the previous one had a problem.
# Re-add all existing devices.
device_info['added'] = current_devices
# Retry cleaning devices that may not have been cleaned properly.
# And clean any that disappeared since the previous iteration.
device_info['removed'] = (previous['removed'] | previous['current']
- current_devices)
# Retry updating devices that may not have been updated properly.
# And any that were updated since the previous iteration.
# Only update devices that currently exist.
device_info['updated'] = (previous['updated'] | updated_devices
& current_devices)
else:
device_info['added'] = current_devices - previous['current']
device_info['removed'] = previous['current'] - current_devices
device_info['updated'] = updated_devices & current_devices
return device_info
def _device_info_has_changes(self, device_info):
@ -951,39 +981,27 @@ class LinuxBridgeNeutronAgentRPC(sg_rpc.SecurityGroupAgentRpcMixin):
or device_info.get('removed'))
def daemon_loop(self):
sync = True
devices = set()
LOG.info(_("LinuxBridge Agent RPC Daemon Started!"))
device_info = None
sync = True
while True:
start = time.time()
device_info = self.scan_devices(previous=device_info, sync=sync)
if sync:
LOG.info(_("Agent out of sync with plugin!"))
devices.clear()
sync = False
device_info = {}
# Save updated devices dict to perform rollback in case
# resync would be needed, and then clear self.updated_devices.
# As the greenthread should not yield between these
# two statements, this will should be thread-safe.
updated_devices_copy = self.updated_devices
self.updated_devices = set()
try:
device_info = self.scan_devices(devices, updated_devices_copy)
if self._device_info_has_changes(device_info):
LOG.debug(_("Agent loop found changes! %s"), device_info)
# If treat devices fails - indicates must resync with
# plugin
if self._device_info_has_changes(device_info):
LOG.debug(_("Agent loop found changes! %s"), device_info)
try:
sync = self.process_network_devices(device_info)
devices = device_info['current']
except Exception:
LOG.exception(_("Error in agent loop. Devices info: %s"),
device_info)
sync = True
# Restore devices that were removed from this set earlier
# without overwriting ones that may have arrived since.
self.updated_devices |= updated_devices_copy
except Exception:
LOG.exception(_("Error in agent loop. Devices info: %s"),
device_info)
sync = True
# sleep till end of polling interval
elapsed = (time.time() - start)

View File

@ -17,7 +17,6 @@ import os
import mock
from oslo.config import cfg
import testtools
from neutron.agent.linux import ip_lib
from neutron.agent.linux import utils
@ -168,88 +167,121 @@ class TestLinuxBridgeAgent(base.BaseTestCase):
self.assertTrue(fn_udd.called)
self.assertTrue(fn_rdf.called)
def test_loop_restores_updated_devices_on_exception(self):
agent = self.agent
agent.updated_devices = set(['tap1', 'tap2'])
with contextlib.nested(
mock.patch.object(agent, 'scan_devices'),
mock.patch.object(linuxbridge_neutron_agent.LOG, 'info'),
mock.patch.object(agent, 'process_network_devices')
) as (scan_devices, log, process_network_devices):
# Simulate effect of 2 port_update()s when loop is running.
# And break out of loop at start of 2nd iteration.
log.side_effect = [agent.updated_devices.add('tap3'),
agent.updated_devices.add('tap4'),
ValueError]
scan_devices.side_effect = RuntimeError
with testtools.ExpectedException(ValueError):
agent.daemon_loop()
# Check that the originals {tap1,tap2} have been restored
# and the new updates {tap3, tap4} have not been overwritten.
self.assertEqual(set(['tap1', 'tap2', 'tap3', 'tap4']),
agent.updated_devices)
self.assertEqual(3, log.call_count)
def mock_scan_devices(self, expected, mock_current,
registered_devices, updated_devices):
def _test_scan_devices(self, previous, updated,
fake_current, expected, sync):
self.agent.br_mgr = mock.Mock()
self.agent.br_mgr.get_tap_devices.return_value = mock_current
self.agent.br_mgr.get_tap_devices.return_value = fake_current
results = self.agent.scan_devices(registered_devices, updated_devices)
self.agent.updated_devices = updated
results = self.agent.scan_devices(previous, sync)
self.assertEqual(expected, results)
def test_scan_devices_returns_empty_sets(self):
registered = set()
updated = set()
mock_current = set()
expected = {'current': set(),
'updated': set(),
'added': set(),
'removed': set()}
self.mock_scan_devices(expected, mock_current, registered, updated)
def test_scan_devices_no_changes(self):
registered = set(['tap1', 'tap2'])
updated = set()
mock_current = set(['tap1', 'tap2'])
expected = {'current': set(['tap1', 'tap2']),
previous = {'current': set([1, 2]),
'updated': set(),
'added': set(),
'removed': set()}
self.mock_scan_devices(expected, mock_current, registered, updated)
def test_scan_devices_new_and_removed(self):
registered = set(['tap1', 'tap2'])
fake_current = set([1, 2])
updated = set()
mock_current = set(['tap2', 'tap3'])
expected = {'current': set(['tap2', 'tap3']),
'updated': set(),
'added': set(['tap3']),
'removed': set(['tap1'])}
self.mock_scan_devices(expected, mock_current, registered, updated)
def test_scan_devices_new_updates(self):
registered = set(['tap1'])
updated = set(['tap2'])
mock_current = set(['tap1', 'tap2'])
expected = {'current': set(['tap1', 'tap2']),
'updated': set(['tap2']),
'added': set(['tap2']),
'removed': set()}
self.mock_scan_devices(expected, mock_current, registered, updated)
def test_scan_devices_updated_missing(self):
registered = set(['tap1'])
updated = set(['tap2'])
mock_current = set(['tap1'])
expected = {'current': set(['tap1']),
expected = {'current': set([1, 2]),
'updated': set(),
'added': set(),
'removed': set()}
self.mock_scan_devices(expected, mock_current, registered, updated)
self._test_scan_devices(previous, updated, fake_current, expected,
sync=False)
def test_scan_devices_added_removed(self):
previous = {'current': set([1, 2]),
'updated': set(),
'added': set(),
'removed': set()}
fake_current = set([2, 3])
updated = set()
expected = {'current': set([2, 3]),
'updated': set(),
'added': set([3]),
'removed': set([1])}
self._test_scan_devices(previous, updated, fake_current, expected,
sync=False)
def test_scan_devices_removed_retried_on_sync(self):
previous = {'current': set([2, 3]),
'updated': set(),
'added': set(),
'removed': set([1])}
fake_current = set([2, 3])
updated = set()
expected = {'current': set([2, 3]),
'updated': set(),
'added': set([2, 3]),
'removed': set([1])}
self._test_scan_devices(previous, updated, fake_current, expected,
sync=True)
def test_scan_devices_vanished_removed_on_sync(self):
previous = {'current': set([2, 3]),
'updated': set(),
'added': set(),
'removed': set([1])}
# Device 2 disappeared.
fake_current = set([3])
updated = set()
# Device 1 should be retried.
expected = {'current': set([3]),
'updated': set(),
'added': set([3]),
'removed': set([1, 2])}
self._test_scan_devices(previous, updated, fake_current, expected,
sync=True)
def test_scan_devices_updated(self):
previous = {'current': set([1, 2]),
'updated': set(),
'added': set(),
'removed': set()}
fake_current = set([1, 2])
updated = set([1])
expected = {'current': set([1, 2]),
'updated': set([1]),
'added': set(),
'removed': set()}
self._test_scan_devices(previous, updated, fake_current, expected,
sync=False)
def test_scan_devices_updated_non_existing(self):
previous = {'current': set([1, 2]),
'updated': set(),
'added': set(),
'removed': set()}
fake_current = set([1, 2])
updated = set([3])
expected = {'current': set([1, 2]),
'updated': set(),
'added': set(),
'removed': set()}
self._test_scan_devices(previous, updated, fake_current, expected,
sync=False)
def test_scan_devices_updated_on_sync(self):
previous = {'current': set([1, 2]),
'updated': set([1]),
'added': set(),
'removed': set()}
fake_current = set([1, 2])
updated = set([2])
expected = {'current': set([1, 2]),
'updated': set([1, 2]),
'added': set([1, 2]),
'removed': set()}
self._test_scan_devices(previous, updated, fake_current, expected,
sync=True)
def test_process_network_devices(self):
agent = self.agent