Merge "linuxbridge-agent: process port updates in the main loop"

This commit is contained in:
Jenkins 2014-06-21 00:17:16 +00:00 committed by Gerrit Code Review
commit b56b738a16
2 changed files with 194 additions and 267 deletions

View File

@ -509,16 +509,6 @@ class LinuxBridgeManager:
int_vxlan.link.delete()
LOG.debug(_("Done deleting vxlan interface %s"), interface)
def update_devices(self, registered_devices):
devices = self.get_tap_devices()
if devices == registered_devices:
return
added = devices - registered_devices
removed = registered_devices - devices
return {'current': devices,
'added': added,
'removed': removed}
def get_tap_devices(self):
devices = set()
for device in os.listdir(BRIDGE_FS):
@ -679,57 +669,14 @@ class LinuxBridgeRpcCallbacks(rpc_compat.RpcCallback,
self.agent.br_mgr.delete_vlan_bridge(bridge_name)
def port_update(self, context, **kwargs):
LOG.debug(_("port_update received"))
# Check port exists on node
port = kwargs.get('port')
tap_device_name = self.agent.br_mgr.get_tap_device_name(port['id'])
devices = self.agent.br_mgr.get_tap_devices()
if tap_device_name not in devices:
return
if 'security_groups' in port:
self.sg_agent.refresh_firewall()
try:
if port['admin_state_up']:
network_type = kwargs.get('network_type')
if network_type:
segmentation_id = kwargs.get('segmentation_id')
else:
# compatibility with pre-Havana RPC vlan_id encoding
vlan_id = kwargs.get('vlan_id')
(network_type,
segmentation_id) = lconst.interpret_vlan_id(vlan_id)
physical_network = kwargs.get('physical_network')
# create the networking for the port
if self.agent.br_mgr.add_interface(port['network_id'],
network_type,
physical_network,
segmentation_id,
port['id']):
# update plugin about port status
self.agent.plugin_rpc.update_device_up(self.context,
tap_device_name,
self.agent.agent_id,
cfg.CONF.host)
else:
self.agent.plugin_rpc.update_device_down(
self.context,
tap_device_name,
self.agent.agent_id,
cfg.CONF.host
)
else:
bridge_name = self.agent.br_mgr.get_bridge_name(
port['network_id'])
self.agent.br_mgr.remove_interface(bridge_name,
tap_device_name)
# update plugin about port status
self.agent.plugin_rpc.update_device_down(self.context,
tap_device_name,
self.agent.agent_id,
cfg.CONF.host)
except rpc_compat.MessagingTimeout:
LOG.error(_("RPC timeout while updating port %s"), port['id'])
port_id = kwargs['port']['id']
tap_name = self.agent.br_mgr.get_tap_device_name(port_id)
# Put the tap name in the updated_devices set.
# Do not store port details, as if they're used for processing
# notifications there is no guarantee the notifications are
# processed in the same order as the relevant API requests.
self.agent.updated_devices.add(tap_name)
LOG.debug(_("port_update RPC received for port: %s"), port_id)
def fdb_add(self, context, fdb_entries):
LOG.debug(_("fdb_add received"))
@ -835,6 +782,8 @@ class LinuxBridgeNeutronAgentRPC(sg_rpc.SecurityGroupAgentRpcMixin):
'agent_type': constants.AGENT_TYPE_LINUXBRIDGE,
'start_flag': True}
# stores received port_updates for processing by the main loop
self.updated_devices = set()
self.setup_rpc(interface_mappings.values())
self.init_firewall()
@ -896,18 +845,30 @@ class LinuxBridgeNeutronAgentRPC(sg_rpc.SecurityGroupAgentRpcMixin):
def process_network_devices(self, device_info):
resync_a = False
resync_b = False
if 'added' in device_info:
resync_a = self.treat_devices_added(device_info['added'])
if 'removed' in device_info:
self.prepare_devices_filter(device_info.get('added'))
if device_info.get('updated'):
self.refresh_firewall()
# Updated devices are processed the same as new ones, as their
# admin_state_up may have changed. The set union prevents duplicating
# work when a device is new and updated in the same polling iteration.
devices_added_updated = (set(device_info.get('added'))
| set(device_info.get('updated')))
if devices_added_updated:
resync_a = self.treat_devices_added_updated(devices_added_updated)
if device_info.get('removed'):
resync_b = self.treat_devices_removed(device_info['removed'])
# If one of the above operations fails => resync with plugin
return (resync_a | resync_b)
def treat_devices_added(self, devices):
def treat_devices_added_updated(self, devices):
resync = False
self.prepare_devices_filter(devices)
for device in devices:
LOG.debug(_("Port %s added"), device)
LOG.debug(_("Treating added or updated device: %s"), device)
try:
details = self.plugin_rpc.get_device_details(self.context,
device,
@ -976,6 +937,22 @@ class LinuxBridgeNeutronAgentRPC(sg_rpc.SecurityGroupAgentRpcMixin):
self.br_mgr.remove_empty_bridges()
return resync
def scan_devices(self, registered_devices, updated_devices):
curr_devices = self.br_mgr.get_tap_devices()
device_info = {}
device_info['current'] = curr_devices
device_info['added'] = curr_devices - registered_devices
# we don't want to process updates for devices that don't exist
device_info['updated'] = updated_devices & curr_devices
# we need to clean up after devices are removed
device_info['removed'] = registered_devices - curr_devices
return device_info
def _device_info_has_changes(self, device_info):
return (device_info.get('added')
or device_info.get('updated')
or device_info.get('removed'))
def daemon_loop(self):
sync = True
devices = set()
@ -989,15 +966,16 @@ class LinuxBridgeNeutronAgentRPC(sg_rpc.SecurityGroupAgentRpcMixin):
devices.clear()
sync = False
device_info = {}
# Save updated devices dict to perform rollback in case
# resync would be needed, and then clear self.updated_devices.
# As the greenthread should not yield between these
# two statements, this will should be thread-safe.
updated_devices_copy = self.updated_devices
self.updated_devices = set()
try:
device_info = self.br_mgr.update_devices(devices)
except Exception:
LOG.exception(_("Update devices failed"))
sync = True
try:
# notify plugin about device deltas
if device_info:
LOG.debug(_("Agent loop has new devices!"))
device_info = self.scan_devices(devices, updated_devices_copy)
if self._device_info_has_changes(device_info):
LOG.debug(_("Agent loop found changes! %s"), device_info)
# If treat devices fails - indicates must resync with
# plugin
sync = self.process_network_devices(device_info)
@ -1006,6 +984,10 @@ class LinuxBridgeNeutronAgentRPC(sg_rpc.SecurityGroupAgentRpcMixin):
LOG.exception(_("Error in agent loop. Devices info: %s"),
device_info)
sync = True
# Restore devices that were removed from this set earlier
# without overwriting ones that may have arrived since.
self.updated_devices |= updated_devices_copy
# sleep till end of polling interval
elapsed = (time.time() - start)
if (elapsed < self.polling_interval):

View File

@ -25,7 +25,6 @@ from neutron.agent.linux import ip_lib
from neutron.agent.linux import utils
from neutron.common import constants
from neutron.common import exceptions
from neutron.common import rpc_compat
from neutron.plugins.common import constants as p_const
from neutron.plugins.linuxbridge.agent import linuxbridge_neutron_agent
from neutron.plugins.linuxbridge.common import constants as lconst
@ -111,6 +110,9 @@ class TestLinuxBridgeAgent(base.BaseTestCase):
'get_interface_mac')
self.get_mac = self.get_mac_p.start()
self.get_mac.return_value = '00:00:00:00:00:01'
self.agent = linuxbridge_neutron_agent.LinuxBridgeNeutronAgentRPC({},
0,
None)
def test_treat_devices_removed_with_existed_device(self):
agent = linuxbridge_neutron_agent.LinuxBridgeNeutronAgentRPC({},
@ -168,52 +170,146 @@ class TestLinuxBridgeAgent(base.BaseTestCase):
self.assertTrue(fn_udd.called)
self.assertTrue(fn_rdf.called)
def test_update_devices_failed(self):
agent = linuxbridge_neutron_agent.LinuxBridgeNeutronAgentRPC({},
0,
None)
raise_exception = [0]
def test_loop_restores_updated_devices_on_exception(self):
agent = self.agent
agent.updated_devices = set(['tap1', 'tap2'])
def info_mock(msg):
if raise_exception[0] < 2:
raise_exception[0] += 1
else:
raise RuntimeError()
with mock.patch.object(agent.br_mgr,
"update_devices") as update_devices:
update_devices.side_effect = RuntimeError
with mock.patch.object(linuxbridge_neutron_agent.LOG,
'info') as log:
log.side_effect = info_mock
with testtools.ExpectedException(RuntimeError):
with contextlib.nested(
mock.patch.object(agent, 'scan_devices'),
mock.patch.object(linuxbridge_neutron_agent.LOG, 'info'),
mock.patch.object(agent, 'process_network_devices')
) as (scan_devices, log, process_network_devices):
# Simulate effect of 2 port_update()s when loop is running.
# And break out of loop at start of 2nd iteration.
log.side_effect = [agent.updated_devices.add('tap3'),
agent.updated_devices.add('tap4'),
ValueError]
scan_devices.side_effect = RuntimeError
with testtools.ExpectedException(ValueError):
agent.daemon_loop()
# Check that the originals {tap1,tap2} have been restored
# and the new updates {tap3, tap4} have not been overwritten.
self.assertEqual(set(['tap1', 'tap2', 'tap3', 'tap4']),
agent.updated_devices)
self.assertEqual(3, log.call_count)
def test_process_network_devices_failed(self):
device_info = {'current': [1, 2, 3]}
agent = linuxbridge_neutron_agent.LinuxBridgeNeutronAgentRPC({},
0,
None)
raise_exception = [0]
def mock_scan_devices(self, expected, mock_current,
registered_devices, updated_devices):
self.agent.br_mgr = mock.Mock()
self.agent.br_mgr.get_tap_devices.return_value = mock_current
def info_mock(msg):
if raise_exception[0] < 2:
raise_exception[0] += 1
else:
raise RuntimeError()
results = self.agent.scan_devices(registered_devices, updated_devices)
self.assertEqual(expected, results)
with mock.patch.object(agent.br_mgr,
"update_devices") as update_devices:
update_devices.side_effect = device_info
with contextlib.nested(
mock.patch.object(linuxbridge_neutron_agent.LOG, 'info'),
mock.patch.object(agent, 'process_network_devices')
) as (log, process_network_devices):
log.side_effect = info_mock
process_network_devices.side_effect = RuntimeError
with testtools.ExpectedException(RuntimeError):
agent.daemon_loop()
self.assertEqual(3, log.call_count)
def test_scan_devices_returns_empty_sets(self):
registered = set()
updated = set()
mock_current = set()
expected = {'current': set(),
'updated': set(),
'added': set(),
'removed': set()}
self.mock_scan_devices(expected, mock_current, registered, updated)
def test_scan_devices_no_changes(self):
registered = set(['tap1', 'tap2'])
updated = set()
mock_current = set(['tap1', 'tap2'])
expected = {'current': set(['tap1', 'tap2']),
'updated': set(),
'added': set(),
'removed': set()}
self.mock_scan_devices(expected, mock_current, registered, updated)
def test_scan_devices_new_and_removed(self):
registered = set(['tap1', 'tap2'])
updated = set()
mock_current = set(['tap2', 'tap3'])
expected = {'current': set(['tap2', 'tap3']),
'updated': set(),
'added': set(['tap3']),
'removed': set(['tap1'])}
self.mock_scan_devices(expected, mock_current, registered, updated)
def test_scan_devices_new_updates(self):
registered = set(['tap1'])
updated = set(['tap2'])
mock_current = set(['tap1', 'tap2'])
expected = {'current': set(['tap1', 'tap2']),
'updated': set(['tap2']),
'added': set(['tap2']),
'removed': set()}
self.mock_scan_devices(expected, mock_current, registered, updated)
def test_scan_devices_updated_missing(self):
registered = set(['tap1'])
updated = set(['tap2'])
mock_current = set(['tap1'])
expected = {'current': set(['tap1']),
'updated': set(),
'added': set(),
'removed': set()}
self.mock_scan_devices(expected, mock_current, registered, updated)
def test_process_network_devices(self):
agent = self.agent
device_info = {'current': set(),
'added': set(['tap3', 'tap4']),
'updated': set(['tap2', 'tap3']),
'removed': set(['tap1'])}
agent.prepare_devices_filter = mock.Mock()
agent.refresh_firewall = mock.Mock()
agent.treat_devices_added_updated = mock.Mock(return_value=False)
agent.treat_devices_removed = mock.Mock(return_value=False)
agent.process_network_devices(device_info)
agent.prepare_devices_filter.assert_called_with(set(['tap3', 'tap4']))
self.assertTrue(agent.refresh_firewall.called)
agent.treat_devices_added_updated.assert_called_with(set(['tap2',
'tap3',
'tap4']))
agent.treat_devices_removed.assert_called_with(set(['tap1']))
def test_treat_devices_added_updated_admin_state_up_true(self):
agent = self.agent
mock_details = {'port_id': 'port123',
'network_id': 'net123',
'admin_state_up': True,
'network_type': 'vlan',
'segmentation_id': 100,
'physical_network': 'physnet1'}
agent.plugin_rpc = mock.Mock()
agent.plugin_rpc.get_device_details.return_value = mock_details
agent.br_mgr = mock.Mock()
agent.br_mgr.add_interface.return_value = True
resync_needed = agent.treat_devices_added_updated(set(['tap1']))
self.assertFalse(resync_needed)
agent.br_mgr.add_interface.assert_called_with('net123', 'vlan',
'physnet1', 100,
'port123')
self.assertTrue(agent.plugin_rpc.update_device_up.called)
def test_treat_devices_added_updated_admin_state_up_false(self):
mock_details = {'port_id': 'port123',
'network_id': 'net123',
'admin_state_up': False,
'network_type': 'vlan',
'segmentation_id': 100,
'physical_network': 'physnet1'}
self.agent.plugin_rpc = mock.Mock()
self.agent.plugin_rpc.get_device_details.return_value = mock_details
self.agent.remove_port_binding = mock.Mock()
resync_needed = self.agent.treat_devices_added_updated(set(['tap1']))
self.assertFalse(resync_needed)
self.agent.remove_port_binding.assert_called_with('net123', 'port123')
self.assertFalse(self.agent.plugin_rpc.update_device_up.called)
class TestLinuxBridgeManager(base.BaseTestCase):
@ -704,18 +800,6 @@ class TestLinuxBridgeManager(base.BaseTestCase):
self.lbm.delete_vlan("eth1.1")
self.assertTrue(exec_fn.called)
def test_update_devices(self):
with mock.patch.object(self.lbm, "get_tap_devices") as gt_fn:
gt_fn.return_value = set(["dev1"])
self.assertIsNone(self.lbm.update_devices(set(["dev1"])))
gt_fn.return_value = set(["dev1", "dev2"])
self.assertEqual(self.lbm.update_devices(set(["dev2", "dev3"])),
{"current": set(["dev1", "dev2"]),
"added": set(["dev1"]),
"removed": set(["dev3"])
})
def _check_vxlan_support(self, expected, vxlan_module_supported,
vxlan_ucast_supported, vxlan_mcast_supported):
with contextlib.nested(
@ -857,145 +941,6 @@ class TestLinuxBridgeRpcCallbacks(base.BaseTestCase):
get_br_fn.assert_called_with("123")
del_fn.assert_called_with("br0")
def test_port_update(self):
with contextlib.nested(
mock.patch.object(self.lb_rpc.agent.br_mgr,
"get_tap_device_name"),
mock.patch.object(self.lb_rpc.agent.br_mgr,
"get_tap_devices"),
mock.patch.object(self.lb_rpc.agent.br_mgr,
"get_bridge_name"),
mock.patch.object(self.lb_rpc.agent.br_mgr,
"remove_interface"),
mock.patch.object(self.lb_rpc.agent.br_mgr, "add_interface"),
mock.patch.object(self.lb_rpc.agent,
"plugin_rpc", create=True),
mock.patch.object(self.lb_rpc.sg_agent,
"refresh_firewall", create=True)
) as (get_tap_fn, get_tap_devs_fn, getbr_fn, remif_fn,
addif_fn, rpc_obj, reffw_fn):
get_tap_fn.return_value = "tap123"
get_tap_devs_fn.return_value = set(["tap123", "tap124"])
port = {"admin_state_up": True,
"id": "1234-5678",
"network_id": "123-123"}
self.lb_rpc.port_update("unused_context", port=port,
vlan_id="1", physical_network="physnet1")
self.assertFalse(reffw_fn.called)
addif_fn.assert_called_with(port["network_id"], p_const.TYPE_VLAN,
"physnet1", "1", port["id"])
self.lb_rpc.port_update("unused_context", port=port,
network_type=p_const.TYPE_VLAN,
segmentation_id="2",
physical_network="physnet1")
self.assertFalse(reffw_fn.called)
addif_fn.assert_called_with(port["network_id"], p_const.TYPE_VLAN,
"physnet1", "2", port["id"])
self.lb_rpc.port_update("unused_context", port=port,
vlan_id=lconst.FLAT_VLAN_ID,
physical_network="physnet1")
self.assertFalse(reffw_fn.called)
addif_fn.assert_called_with(port["network_id"], p_const.TYPE_FLAT,
"physnet1", None, port["id"])
self.lb_rpc.port_update("unused_context", port=port,
network_type=p_const.TYPE_FLAT,
segmentation_id=None,
physical_network="physnet1")
self.assertFalse(reffw_fn.called)
addif_fn.assert_called_with(port["network_id"], p_const.TYPE_FLAT,
"physnet1", None, port["id"])
self.lb_rpc.port_update("unused_context", port=port,
vlan_id=lconst.LOCAL_VLAN_ID,
physical_network=None)
self.assertFalse(reffw_fn.called)
addif_fn.assert_called_with(port["network_id"], p_const.TYPE_LOCAL,
None, None, port["id"])
self.lb_rpc.port_update("unused_context", port=port,
network_type=p_const.TYPE_LOCAL,
segmentation_id=None,
physical_network=None)
self.assertFalse(reffw_fn.called)
addif_fn.assert_called_with(port["network_id"], p_const.TYPE_LOCAL,
None, None, port["id"])
addif_fn.return_value = True
self.lb_rpc.port_update("unused_context", port=port,
network_type=p_const.TYPE_LOCAL,
segmentation_id=None,
physical_network=None)
rpc_obj.update_device_up.assert_called_with(
self.lb_rpc.context,
"tap123",
self.lb_rpc.agent.agent_id,
cfg.CONF.host
)
addif_fn.return_value = False
self.lb_rpc.port_update("unused_context", port=port,
network_type=p_const.TYPE_LOCAL,
segmentation_id=None,
physical_network=None)
rpc_obj.update_device_down.assert_called_with(
self.lb_rpc.context,
"tap123",
self.lb_rpc.agent.agent_id,
cfg.CONF.host
)
port["admin_state_up"] = False
port["security_groups"] = True
getbr_fn.return_value = "br0"
self.lb_rpc.port_update("unused_context", port=port,
vlan_id="1", physical_network="physnet1")
self.assertTrue(reffw_fn.called)
remif_fn.assert_called_with("br0", "tap123")
rpc_obj.update_device_down.assert_called_with(
self.lb_rpc.context,
"tap123",
self.lb_rpc.agent.agent_id,
cfg.CONF.host
)
def test_port_update_plugin_rpc_failed(self):
with contextlib.nested(
mock.patch.object(self.lb_rpc.agent.br_mgr,
"get_tap_device_name"),
mock.patch.object(self.lb_rpc.agent.br_mgr,
"get_tap_devices"),
mock.patch.object(self.lb_rpc.agent.br_mgr,
"get_bridge_name"),
mock.patch.object(self.lb_rpc.agent.br_mgr,
"remove_interface"),
mock.patch.object(self.lb_rpc.agent.br_mgr, "add_interface"),
mock.patch.object(self.lb_rpc.sg_agent,
"refresh_firewall", create=True),
mock.patch.object(self.lb_rpc.agent,
"plugin_rpc", create=True),
mock.patch.object(linuxbridge_neutron_agent.LOG, 'error'),
) as (get_tap_fn, get_tap_devs_fn, _, _, _, _, plugin_rpc, log):
get_tap_fn.return_value = "tap123"
get_tap_devs_fn.return_value = set(["tap123", "tap124"])
port = {"admin_state_up": True,
"id": "1234-5678",
"network_id": "123-123"}
timeout_class = rpc_compat.MessagingTimeout
plugin_rpc.update_device_up.side_effect = timeout_class
self.lb_rpc.port_update(mock.Mock(), port=port)
self.assertTrue(plugin_rpc.update_device_up.called)
self.assertEqual(log.call_count, 1)
log.reset_mock()
port["admin_state_up"] = False
plugin_rpc.update_device_down.side_effect = timeout_class
self.lb_rpc.port_update(mock.Mock(), port=port)
self.assertTrue(plugin_rpc.update_device_down.called)
self.assertEqual(log.call_count, 1)
def test_fdb_add(self):
fdb_entries = {'net_id':
{'ports':