OFAgent: Process port_update notifications in the main agent loop

Port the following patch to OFAgent.
commit: 5e6e592132
https://review.openstack.org/#/c/61964/

Partial-Bug: 1293265

Change-Id: I53813d12c66dc746cd373fd91ff9bd9bdbf222db
This commit is contained in:
fumihiko kakuma 2014-03-16 13:13:03 +09:00
parent 9a1af60ae0
commit 34e3be16a1
2 changed files with 228 additions and 163 deletions

View File

@ -36,10 +36,8 @@ from neutron.common import constants as n_const
from neutron.common import topics
from neutron.common import utils as n_utils
from neutron import context
from neutron.extensions import securitygroup as ext_sg
from neutron.openstack.common import log as logging
from neutron.openstack.common import loopingcall
from neutron.openstack.common.rpc import common as rpc_common
from neutron.openstack.common.rpc import dispatcher
from neutron.plugins.common import constants as p_const
from neutron.plugins.ofagent.common import config # noqa
@ -277,6 +275,8 @@ class OFANeutronAgent(sg_rpc.SecurityGroupAgentRpcCallbackMixin):
self.sg_agent = OFASecurityGroupAgent(self.context,
self.plugin_rpc,
self.root_helper)
# Stores port update notifications for processing in main loop
self.updated_ports = set()
# Initialize iteration counter
self.iter_num = 0
@ -353,33 +353,12 @@ class OFANeutronAgent(sg_rpc.SecurityGroupAgentRpcCallbackMixin):
def port_update(self, context, **kwargs):
port = kwargs.get('port')
# Put the port identifier in the updated_ports set.
# Even if full port details might be provided to this call,
# they are not used since there is no guarantee the notifications
# are processed in the same order as the relevant API requests
self.updated_ports.add(port['id'])
LOG.debug(_("port_update received port %s"), port['id'])
# Validate that port is on OVS
vif_port = self.int_br.get_vif_port_by_id(port['id'])
if not vif_port:
return
if ext_sg.SECURITYGROUPS in port:
self.sg_agent.refresh_firewall()
network_type = kwargs.get('network_type')
segmentation_id = kwargs.get('segmentation_id')
physical_network = kwargs.get('physical_network')
self.treat_vif_port(vif_port, port['id'], port['network_id'],
network_type, physical_network,
segmentation_id, port['admin_state_up'])
try:
if port['admin_state_up']:
# update plugin about port status
self.plugin_rpc.update_device_up(self.context, port['id'],
self.agent_id,
cfg.CONF.host)
else:
# update plugin about port status
self.plugin_rpc.update_device_down(self.context, port['id'],
self.agent_id,
cfg.CONF.host)
except rpc_common.Timeout:
LOG.error(_("RPC timeout while updating port %s"), port['id'])
def tunnel_update(self, context, **kwargs):
LOG.debug(_("tunnel_update received"))
@ -973,16 +952,27 @@ class OFANeutronAgent(sg_rpc.SecurityGroupAgentRpcCallbackMixin):
self._phys_br_patch_physical_bridge_with_integration_bridge(
br, physical_network, bridge, ip_wrapper)
def update_ports(self, registered_ports):
ports = self.int_br.get_vif_port_set()
if ports == registered_ports:
return
self.int_br_device_count = len(ports)
added = ports - registered_ports
removed = registered_ports - ports
return {'current': ports,
'added': added,
'removed': removed}
def scan_ports(self, registered_ports, updated_ports=None):
cur_ports = self.int_br.get_vif_port_set()
self.int_br_device_count = len(cur_ports)
port_info = {'current': cur_ports}
if updated_ports:
# Some updated ports might have been removed in the
# meanwhile, and therefore should not be processed.
# In this case the updated port won't be found among
# current ports.
updated_ports &= cur_ports
if updated_ports:
port_info['updated'] = updated_ports
if cur_ports == registered_ports:
# No added or removed ports to set, just return here
return port_info
port_info['added'] = cur_ports - registered_ports
# Remove all the known ports not found on the integration bridge
port_info['removed'] = registered_ports - cur_ports
return port_info
def update_ancillary_ports(self, registered_ports):
ports = set()
@ -1066,11 +1056,10 @@ class OFANeutronAgent(sg_rpc.SecurityGroupAgentRpcCallbackMixin):
self.ryu_send_msg(msg)
return ofport
def treat_devices_added(self, devices):
def treat_devices_added_or_updated(self, devices):
resync = False
self.sg_agent.prepare_devices_filter(devices)
for device in devices:
LOG.info(_("Port %s added"), device)
LOG.debug(_("Processing port %s"), device)
try:
details = self.plugin_rpc.get_device_details(self.context,
device,
@ -1093,12 +1082,17 @@ class OFANeutronAgent(sg_rpc.SecurityGroupAgentRpcCallbackMixin):
details['admin_state_up'])
# update plugin about port status
self.plugin_rpc.update_device_up(self.context,
device,
self.agent_id,
cfg.CONF.host)
if details.get('admin_state_up'):
LOG.debug(_("Setting status for %s to UP"), device)
self.plugin_rpc.update_device_up(
self.context, device, self.agent_id, cfg.CONF.host)
else:
LOG.debug(_("Setting status for %s to DOWN"), device)
self.plugin_rpc.update_device_down(
self.context, device, self.agent_id, cfg.CONF.host)
LOG.info(_("Configuration for device %s completed."), device)
else:
LOG.debug(_("Device %s not defined on plugin"), device)
LOG.warn(_("Device %s not defined on plugin"), device)
if (port and int(port.ofport) != -1):
self.port_dead(port)
return resync
@ -1166,11 +1160,25 @@ class OFANeutronAgent(sg_rpc.SecurityGroupAgentRpcCallbackMixin):
def process_network_ports(self, port_info):
resync_add = False
resync_removed = False
if 'added' in port_info:
# If there is an exception while processing security groups ports
# will not be wired anyway, and a resync will be triggered
self.sg_agent.prepare_devices_filter(port_info.get('added', set()))
if port_info.get('updated'):
self.sg_agent.refresh_firewall()
# VIF wiring needs to be performed always for 'new' devices.
# For updated ports, re-wiring is not needed in most cases, but needs
# to be performed anyway when the admin state of a device is changed.
# A device might be both in the 'added' and 'updated'
# list at the same time; avoid processing it twice.
devices_added_updated = (port_info.get('added', set()) |
port_info.get('updated', set()))
if devices_added_updated:
start = time.time()
resync_add = self.treat_devices_added(port_info['added'])
resync_add = self.treat_devices_added_or_updated(
devices_added_updated)
LOG.debug(_("process_network_ports - iteration:%(iter_num)d - "
"treat_devices_added completed in %(elapsed).3f"),
"treat_devices_added_or_updated completed "
"in %(elapsed).3f"),
{'iter_num': self.iter_num,
'elapsed': time.time() - start})
if 'removed' in port_info:
@ -1230,40 +1238,60 @@ class OFANeutronAgent(sg_rpc.SecurityGroupAgentRpcCallbackMixin):
resync = True
return resync
def _agent_has_updates(self, polling_manager):
return (polling_manager.is_polling_required or
self.updated_ports)
def _port_info_has_changes(self, port_info):
return (port_info.get('added') or
port_info.get('removed') or
port_info.get('updated'))
def ovsdb_monitor_loop(self, polling_manager=None):
if not polling_manager:
polling_manager = polling.AlwaysPoll()
sync = True
ports = set()
updated_ports_copy = set()
ancillary_ports = set()
tunnel_sync = True
while True:
try:
start = time.time()
port_stats = {'regular': {'added': 0, 'removed': 0},
'ancillary': {'added': 0, 'removed': 0}}
LOG.debug(_("Agent ovsdb_monitor_loop - "
"iteration:%d started"),
self.iter_num)
if sync:
LOG.info(_("Agent out of sync with plugin!"))
ports.clear()
ancillary_ports.clear()
sync = False
polling_manager.force_polling()
# Notify the plugin of tunnel IP
if self.enable_tunneling and tunnel_sync:
LOG.info(_("Agent tunnel out of sync with plugin!"))
start = time.time()
port_stats = {'regular': {'added': 0, 'updated': 0, 'removed': 0},
'ancillary': {'added': 0, 'removed': 0}}
LOG.debug(_("Agent ovsdb_monitor_loop - "
"iteration:%d started"),
self.iter_num)
if sync:
LOG.info(_("Agent out of sync with plugin!"))
ports.clear()
ancillary_ports.clear()
sync = False
polling_manager.force_polling()
# Notify the plugin of tunnel IP
if self.enable_tunneling and tunnel_sync:
LOG.info(_("Agent tunnel out of sync with plugin!"))
try:
tunnel_sync = self.tunnel_sync()
if polling_manager.is_polling_required:
except Exception:
LOG.exception(_("Error while synchronizing tunnels"))
tunnel_sync = True
if self._agent_has_updates(polling_manager):
try:
LOG.debug(_("Agent ovsdb_monitor_loop - "
"iteration:%(iter_num)d - "
"starting polling. Elapsed:%(elapsed).3f"),
{'iter_num': self.iter_num,
'elapsed': time.time() - start})
port_info = self.update_ports(ports)
# Save updated ports dict to perform rollback in
# case resync would be needed, and then clear
# self.updated_ports. As the greenthread should not yield
# between these two statements, this will be thread-safe
updated_ports_copy = self.updated_ports
self.updated_ports = set()
port_info = self.scan_ports(ports, updated_ports_copy)
ports = port_info['current']
LOG.debug(_("Agent ovsdb_monitor_loop - "
"iteration:%(iter_num)d - "
"port information retrieved. "
@ -1271,8 +1299,9 @@ class OFANeutronAgent(sg_rpc.SecurityGroupAgentRpcCallbackMixin):
{'iter_num': self.iter_num,
'elapsed': time.time() - start})
# notify plugin about port deltas
if port_info:
LOG.debug(_("Agent loop has new devices!"))
if self._port_info_has_changes(port_info):
LOG.debug(_("Starting to process devices in:%s"),
port_info)
# If treat devices fails - must resync with plugin
sync = self.process_network_ports(port_info)
LOG.debug(_("Agent ovsdb_monitor_loop - "
@ -1280,9 +1309,10 @@ class OFANeutronAgent(sg_rpc.SecurityGroupAgentRpcCallbackMixin):
"ports processed. Elapsed:%(elapsed).3f"),
{'iter_num': self.iter_num,
'elapsed': time.time() - start})
ports = port_info['current']
port_stats['regular']['added'] = (
len(port_info.get('added', [])))
port_stats['regular']['updated'] = (
len(port_info.get('updated', [])))
port_stats['regular']['removed'] = (
len(port_info.get('removed', [])))
# Treat ancillary devices if they exist
@ -1313,11 +1343,11 @@ class OFANeutronAgent(sg_rpc.SecurityGroupAgentRpcCallbackMixin):
sync = sync | rc
polling_manager.polling_completed()
except Exception:
LOG.exception(_("Error in agent event loop"))
sync = True
tunnel_sync = True
except Exception:
LOG.exception(_("Error while processing VIF ports"))
# Put the ports back in self.updated_port
self.updated_ports |= updated_ports_copy
sync = True
# sleep till end of polling interval
elapsed = (time.time() - start)

View File

@ -26,7 +26,6 @@ import testtools
from neutron.agent.linux import ip_lib
from neutron.agent.linux import utils
from neutron.openstack.common import importutils
from neutron.openstack.common.rpc import common as rpc_common
from neutron.plugins.common import constants as p_const
from neutron.plugins.openvswitch.common import constants
from neutron.tests import base
@ -310,28 +309,69 @@ class TestOFANeutronAgent(OFAAgentTestCase):
def test_port_dead_with_port_already_dead(self):
self._test_port_dead(self.mod_agent.DEAD_VLAN_TAG)
def mock_update_ports(self, vif_port_set=None, registered_ports=None):
def mock_scan_ports(self, vif_port_set=None, registered_ports=None,
updated_ports=None):
with mock.patch.object(self.agent.int_br, 'get_vif_port_set',
return_value=vif_port_set):
return self.agent.update_ports(registered_ports)
return self.agent.scan_ports(registered_ports, updated_ports)
def test_update_ports_returns_none_for_unchanged_ports(self):
self.assertIsNone(self.mock_update_ports())
def test_scan_ports_returns_current_only_for_unchanged_ports(self):
vif_port_set = set([1, 3])
registered_ports = set([1, 3])
expected = {'current': vif_port_set}
actual = self.mock_scan_ports(vif_port_set, registered_ports)
self.assertEqual(expected, actual)
def test_update_ports_returns_port_changes(self):
def test_scan_ports_returns_port_changes(self):
vif_port_set = set([1, 3])
registered_ports = set([1, 2])
expected = dict(current=vif_port_set, added=set([3]), removed=set([2]))
actual = self.mock_update_ports(vif_port_set, registered_ports)
actual = self.mock_scan_ports(vif_port_set, registered_ports)
self.assertEqual(expected, actual)
def _test_scan_ports_with_updated_ports(self, updated_ports):
vif_port_set = set([1, 3, 4])
registered_ports = set([1, 2, 4])
expected = dict(current=vif_port_set, added=set([3]),
removed=set([2]), updated=set([4]))
actual = self.mock_scan_ports(vif_port_set, registered_ports,
updated_ports)
self.assertEqual(expected, actual)
def test_scan_ports_finds_known_updated_ports(self):
self._test_scan_ports_with_updated_ports(set([4]))
def test_scan_ports_ignores_unknown_updated_ports(self):
# the port '5' was not seen on current ports. Hence it has either
# never been wired or already removed and should be ignored
self._test_scan_ports_with_updated_ports(set([4, 5]))
def test_scan_ports_ignores_updated_port_if_removed(self):
vif_port_set = set([1, 3])
registered_ports = set([1, 2])
updated_ports = set([1, 2])
expected = dict(current=vif_port_set, added=set([3]),
removed=set([2]), updated=set([1]))
actual = self.mock_scan_ports(vif_port_set, registered_ports,
updated_ports)
self.assertEqual(expected, actual)
def test_scan_ports_no_vif_changes_returns_updated_port_only(self):
vif_port_set = set([1, 2, 3])
registered_ports = set([1, 2, 3])
updated_ports = set([2])
expected = dict(current=vif_port_set, updated=set([2]))
actual = self.mock_scan_ports(vif_port_set, registered_ports,
updated_ports)
self.assertEqual(expected, actual)
def test_treat_devices_added_returns_true_for_missing_device(self):
with mock.patch.object(self.agent.plugin_rpc, 'get_device_details',
side_effect=Exception()):
self.assertTrue(self.agent.treat_devices_added([{}]))
self.assertTrue(self.agent.treat_devices_added_or_updated([{}]))
def _mock_treat_devices_added(self, details, port, func_name):
"""Mock treat devices added.
def _mock_treat_devices_added_updated(self, details, port, func_name):
"""Mock treat devices added or updated.
:param details: the details to return for the device
:param port: the port that get_vif_port_by_id should return
@ -344,29 +384,51 @@ class TestOFANeutronAgent(OFAAgentTestCase):
mock.patch.object(self.agent.int_br, 'get_vif_port_by_id',
return_value=port),
mock.patch.object(self.agent.plugin_rpc, 'update_device_up'),
mock.patch.object(self.agent.plugin_rpc, 'update_device_down'),
mock.patch.object(self.agent, func_name)
) as (get_dev_fn, get_vif_func, upd_dev_up, func):
self.assertFalse(self.agent.treat_devices_added([{}]))
) as (get_dev_fn, get_vif_func, upd_dev_up, upd_dev_down, func):
self.assertFalse(self.agent.treat_devices_added_or_updated([{}]))
return func.called
def test_treat_devices_added_ignores_invalid_ofport(self):
def test_treat_devices_added_updated_ignores_invalid_ofport(self):
port = mock.Mock()
port.ofport = -1
self.assertFalse(self._mock_treat_devices_added(mock.MagicMock(), port,
'port_dead'))
self.assertFalse(self._mock_treat_devices_added_updated(
mock.MagicMock(), port, 'port_dead'))
def test_treat_devices_added_marks_unknown_port_as_dead(self):
def test_treat_devices_added_updated_marks_unknown_port_as_dead(self):
port = mock.Mock()
port.ofport = 1
self.assertTrue(self._mock_treat_devices_added(mock.MagicMock(), port,
'port_dead'))
self.assertTrue(self._mock_treat_devices_added_updated(
mock.MagicMock(), port, 'port_dead'))
def test_treat_devices_added_updates_known_port(self):
def test_treat_devices_added_updated_updates_known_port(self):
details = mock.MagicMock()
details.__contains__.side_effect = lambda x: True
self.assertTrue(self._mock_treat_devices_added(details,
mock.Mock(),
'treat_vif_port'))
self.assertTrue(self._mock_treat_devices_added_updated(
details, mock.Mock(), 'treat_vif_port'))
def test_treat_devices_added_updated_put_port_down(self):
fake_details_dict = {'admin_state_up': False,
'port_id': 'xxx',
'device': 'xxx',
'network_id': 'yyy',
'physical_network': 'foo',
'segmentation_id': 'bar',
'network_type': 'baz'}
with contextlib.nested(
mock.patch.object(self.agent.plugin_rpc, 'get_device_details',
return_value=fake_details_dict),
mock.patch.object(self.agent.int_br, 'get_vif_port_by_id',
return_value=mock.MagicMock()),
mock.patch.object(self.agent.plugin_rpc, 'update_device_up'),
mock.patch.object(self.agent.plugin_rpc, 'update_device_down'),
mock.patch.object(self.agent, 'treat_vif_port')
) as (get_dev_fn, get_vif_func, upd_dev_up,
upd_dev_down, treat_vif_port):
self.assertFalse(self.agent.treat_devices_added_or_updated([{}]))
self.assertTrue(treat_vif_port.called)
self.assertTrue(upd_dev_down.called)
def test_treat_devices_removed_returns_true_for_missing_device(self):
with mock.patch.object(self.agent.plugin_rpc, 'update_device_down',
@ -387,17 +449,36 @@ class TestOFANeutronAgent(OFAAgentTestCase):
def test_treat_devices_removed_ignores_missing_port(self):
self._mock_treat_devices_removed(False)
def _test_process_network_ports(self, port_info):
with contextlib.nested(
mock.patch.object(self.agent.sg_agent, "prepare_devices_filter"),
mock.patch.object(self.agent.sg_agent, "refresh_firewall"),
mock.patch.object(self.agent, "treat_devices_added_or_updated",
return_value=False),
mock.patch.object(self.agent, "treat_devices_removed",
return_value=False)
) as (prep_dev_filter, refresh_fw,
device_added_updated, device_removed):
self.assertFalse(self.agent.process_network_ports(port_info))
prep_dev_filter.assert_called_once_with(port_info['added'])
if port_info.get('updated'):
self.assertEqual(1, refresh_fw.call_count)
device_added_updated.assert_called_once_with(
port_info['added'] | port_info.get('updated', set()))
device_removed.assert_called_once_with(port_info['removed'])
def test_process_network_ports(self):
reply = {'current': set(['tap0']),
'removed': set(['eth0']),
'added': set(['eth1'])}
with mock.patch.object(self.agent, 'treat_devices_added',
return_value=False) as device_added:
with mock.patch.object(self.agent, 'treat_devices_removed',
return_value=False) as device_removed:
self.assertFalse(self.agent.process_network_ports(reply))
device_added.assert_called_once_with(set(['eth1']))
device_removed.assert_called_once_with(set(['eth0']))
self._test_process_network_ports(
{'current': set(['tap0']),
'removed': set(['eth0']),
'added': set(['eth1'])})
def test_process_network_port_with_updated_ports(self):
self._test_process_network_ports(
{'current': set(['tap0', 'tap1']),
'updated': set(['tap1', 'eth1']),
'removed': set(['eth0']),
'added': set(['eth1'])})
def test_report_state(self):
with mock.patch.object(self.agent.state_rpc,
@ -424,61 +505,15 @@ class TestOFANeutronAgent(OFAAgentTestCase):
recl_fn.assert_called_with("123")
def test_port_update(self):
with contextlib.nested(
mock.patch.object(self.agent.int_br, "get_vif_port_by_id"),
mock.patch.object(self.agent, "treat_vif_port"),
mock.patch.object(self.agent.plugin_rpc, "update_device_up"),
mock.patch.object(self.agent.plugin_rpc, "update_device_down")
) as (getvif_fn, treatvif_fn, updup_fn, upddown_fn):
port = {"id": "123",
"network_id": "124",
"admin_state_up": False}
getvif_fn.return_value = "vif_port_obj"
self.agent.port_update("unused_context",
port=port,
network_type="vlan",
segmentation_id="1",
physical_network="physnet")
treatvif_fn.assert_called_with("vif_port_obj", "123",
"124", "vlan", "physnet",
"1", False)
upddown_fn.assert_called_with(self.agent.context,
"123", self.agent.agent_id,
cfg.CONF.host)
port["admin_state_up"] = True
self.agent.port_update("unused_context",
port=port,
network_type="vlan",
segmentation_id="1",
physical_network="physnet")
updup_fn.assert_called_with(self.agent.context,
"123", self.agent.agent_id,
cfg.CONF.host)
def test_port_update_plugin_rpc_failed(self):
port = {'id': 1,
'network_id': 1,
'admin_state_up': True}
with contextlib.nested(
mock.patch.object(self.mod_agent.LOG, 'error'),
mock.patch.object(self.agent.int_br, "get_vif_port_by_id"),
mock.patch.object(self.agent.plugin_rpc, 'update_device_up'),
mock.patch.object(self.agent, 'port_bound'),
mock.patch.object(self.agent.plugin_rpc, 'update_device_down'),
mock.patch.object(self.agent, 'port_dead')
) as (log, _, device_up, _, device_down, _):
device_up.side_effect = rpc_common.Timeout
self.agent.port_update(mock.Mock(), port=port)
self.assertTrue(device_up.called)
self.assertEqual(log.call_count, 1)
log.reset_mock()
port['admin_state_up'] = False
device_down.side_effect = rpc_common.Timeout
self.agent.port_update(mock.Mock(), port=port)
self.assertTrue(device_down.called)
self.assertEqual(log.call_count, 1)
port = {"id": "123",
"network_id": "124",
"admin_state_up": False}
self.agent.port_update("unused_context",
port=port,
network_type="vlan",
segmentation_id="1",
physical_network="physnet")
self.assertEqual(set(['123']), self.agent.updated_ports)
def test_setup_physical_bridges(self):
with contextlib.nested(