MLNX Agent: Process port_update notifications in the main agent loop

This patch changes the way mlnx agent process port_update notifications.
It does the same for the mlnx agent as was done for the ovs-agent in
I219c6bdf63b0b5e945b655677f9e28fa591f03cd.
Processing a port_update notification directly in the RPC
call may cause competition with the main RPC loop.
To prevent this problem, the actual process of ports updates is done
in the main RPC loop, whereas the RPC call merely adds the updated
port MAC address to a set of updated ports.
port_update notifications received within a single main loop iteration
will be coalesced and processed only once.

Closes-Bug: 1279655
Change-Id: I63dda60cb3cf171e5e9111a1ecf95e45e1d86362
Signed-off-by: Roey Chen <roeyc@mellanox.com>
This commit is contained in:
Roey Chen 2014-05-18 11:44:00 +03:00 committed by roeyc
parent 4d92aa0b59
commit fa5eb301dc
2 changed files with 152 additions and 92 deletions

View File

@ -168,45 +168,10 @@ class MlnxEswitchRpcCallbacks(n_rpc.RpcCallback,
self.eswitch.remove_network(network_id)
def port_update(self, context, **kwargs):
LOG.debug(_("port_update received"))
port = kwargs.get('port')
net_type = kwargs.get('network_type')
segmentation_id = kwargs.get('segmentation_id')
if not segmentation_id:
# compatibility with pre-Havana RPC vlan_id encoding
segmentation_id = kwargs.get('vlan_id')
physical_network = kwargs.get('physical_network')
net_id = port['network_id']
if self.eswitch.vnic_port_exists(port['mac_address']):
if 'security_groups' in port:
self.sg_agent.refresh_firewall()
try:
if port['admin_state_up']:
self.eswitch.port_up(net_id,
net_type,
physical_network,
segmentation_id,
port['id'],
port['mac_address'])
# update plugin about port status
self.agent.plugin_rpc.update_device_up(self.context,
port['mac_address'],
self.agent.agent_id,
cfg.CONF.host)
else:
self.eswitch.port_down(net_id,
physical_network,
port['mac_address'])
# update plugin about port status
self.agent.plugin_rpc.update_device_down(
self.context,
port['mac_address'],
self.agent.agent_id,
cfg.CONF.host)
except n_rpc.MessagingTimeout:
LOG.error(_("RPC timeout while updating port %s"), port['id'])
else:
LOG.debug(_("No port %s defined on agent."), port['id'])
self.agent.add_port_update(port['mac_address'])
LOG.debug("port_update message processed for port with mac %s",
port['mac_address'])
class MlnxEswitchPluginApi(agent_rpc.PluginApi,
@ -229,6 +194,8 @@ class MlnxEswitchNeutronAgent(sg_rpc.SecurityGroupAgentRpcMixin):
'configurations': configurations,
'agent_type': q_constants.AGENT_TYPE_MLNX,
'start_flag': True}
# Stores port update notifications for processing in main rpc loop
self.updated_ports = set()
self._setup_rpc()
self.init_firewall()
@ -272,24 +239,27 @@ class MlnxEswitchNeutronAgent(sg_rpc.SecurityGroupAgentRpcMixin):
self._report_state)
heartbeat.start(interval=report_interval)
def update_ports(self, registered_ports):
ports = self.eswitch.get_vnics_mac()
if ports == registered_ports:
return
added = ports - registered_ports
removed = registered_ports - ports
return {'current': ports,
'added': added,
'removed': removed}
def add_port_update(self, port):
self.updated_ports.add(port)
def scan_ports(self, registered_ports, updated_ports_copy=None):
cur_ports = self.eswitch.get_vnics_mac()
port_info = {'current': cur_ports}
# Shouldn't process updates for not existing ports
port_info['updated'] = updated_ports_copy & cur_ports
port_info['added'] = cur_ports - registered_ports
port_info['removed'] = registered_ports - cur_ports
return port_info
def process_network_ports(self, port_info):
resync_a = False
resync_b = False
if port_info.get('added'):
LOG.debug(_("Ports added!"))
resync_a = self.treat_devices_added(port_info['added'])
if port_info.get('removed'):
LOG.debug(_("Ports removed!"))
device_added_updated = port_info['added'] | port_info['updated']
if device_added_updated:
resync_a = self.treat_devices_added_or_updated(
device_added_updated)
if port_info['removed']:
resync_b = self.treat_devices_removed(port_info['removed'])
# If one of the above opertaions fails => resync with plugin
return (resync_a | resync_b)
@ -311,7 +281,7 @@ class MlnxEswitchNeutronAgent(sg_rpc.SecurityGroupAgentRpcMixin):
else:
LOG.debug(_("No port %s defined on agent."), port_id)
def treat_devices_added(self, devices):
def treat_devices_added_or_updated(self, devices):
try:
devs_details_list = self.plugin_rpc.get_devices_details_list(
self.context,
@ -326,11 +296,11 @@ class MlnxEswitchNeutronAgent(sg_rpc.SecurityGroupAgentRpcMixin):
for dev_details in devs_details_list:
device = dev_details['device']
LOG.info(_("Adding port with mac %s"), device)
LOG.info(_("Adding or updating port with mac %s"), device)
if 'port_id' in dev_details:
LOG.info(_("Port %s updated"), device)
LOG.debug(_("Device details %s"), str(dev_details))
LOG.debug("Device details %s", str(dev_details))
self.treat_vif_port(dev_details['port_id'],
dev_details['device'],
dev_details['network_id'],
@ -339,12 +309,16 @@ class MlnxEswitchNeutronAgent(sg_rpc.SecurityGroupAgentRpcMixin):
dev_details['segmentation_id'],
dev_details['admin_state_up'])
if dev_details.get('admin_state_up'):
self.plugin_rpc.update_device_up(self.context,
device,
self.agent_id)
LOG.debug("Setting status for %s to UP", device)
self.plugin_rpc.update_device_up(
self.context, device, self.agent_id)
else:
LOG.debug("Setting status for %s to DOWN", device)
self.plugin_rpc.update_device_down(
self.context, device, self.agent_id)
else:
LOG.debug(_("Device with mac_address %s not defined "
"on Neutron Plugin"), device)
LOG.debug("Device with mac_address %s not defined "
"on Neutron Plugin", device)
return False
def treat_devices_removed(self, devices):
@ -369,27 +343,37 @@ class MlnxEswitchNeutronAgent(sg_rpc.SecurityGroupAgentRpcMixin):
self.eswitch.port_release(device)
return resync
def _port_info_has_changes(self, port_info):
return (port_info['added'] or
port_info['removed'] or
port_info['updated'])
def daemon_loop(self):
sync = True
ports = set()
updated_ports_copy = set()
LOG.info(_("eSwitch Agent Started!"))
while True:
try:
start = time.time()
if sync:
LOG.info(_("Agent out of sync with plugin!"))
ports.clear()
sync = False
start = time.time()
if sync:
LOG.info(_("Agent out of sync with plugin!"))
ports.clear()
sync = False
port_info = self.update_ports(ports)
# notify plugin about port deltas
if port_info:
LOG.debug(_("Agent loop process devices!"))
# If treat devices fails - must resync with plugin
try:
updated_ports_copy = self.updated_ports
self.updated_ports = set()
port_info = self.scan_ports(ports, updated_ports_copy)
LOG.debug("Agent loop process devices!")
# If treat devices fails - must resync with plugin
ports = port_info['current']
if self._port_info_has_changes(port_info):
LOG.debug("Starting to process devices in:%s", port_info)
# sync with upper/lower layers about port deltas
sync = self.process_network_ports(port_info)
ports = port_info['current']
except exceptions.RequestTimeout:
LOG.exception(_("Request timeout in agent event loop "
"eSwitchD is not responding - exiting..."))
@ -397,6 +381,7 @@ class MlnxEswitchNeutronAgent(sg_rpc.SecurityGroupAgentRpcMixin):
except Exception:
LOG.exception(_("Error in agent event loop"))
sync = True
self.updated_ports |= updated_ports_copy
# sleep till end of polling interval
elapsed = (time.time() - start)
if (elapsed < self._polling_interval):

View File

@ -45,6 +45,23 @@ class TestEswichManager(base.BaseTestCase):
self.manager.get_port_id_by_mac('no-such-mac')
class TestMlnxEswitchRpcCallbacks(base.BaseTestCase):
def setUp(self):
super(TestMlnxEswitchRpcCallbacks, self).setUp()
agent = mock.Mock()
self.rpc_callbacks = eswitch_neutron_agent.MlnxEswitchRpcCallbacks(
'context',
agent
)
def test_port_update(self):
port = {'mac_address': '10:20:30:40:50:60'}
add_port_update = self.rpc_callbacks.agent.add_port_update
self.rpc_callbacks.port_update('context', port=port)
add_port_update.assert_called_once_with(port['mac_address'])
class TestEswitchAgent(base.BaseTestCase):
def setUp(self):
@ -82,9 +99,9 @@ class TestEswitchAgent(base.BaseTestCase):
mock.patch('neutron.plugins.mlnx.agent.eswitch_neutron_agent.'
'EswitchManager.get_vnics_mac',
return_value=[])):
self.assertTrue(self.agent.treat_devices_added([{}]))
self.assertTrue(self.agent.treat_devices_added_or_updated([{}]))
def _mock_treat_devices_added(self, details, func_name):
def _mock_treat_devices_added_updated(self, details, func_name):
"""Mock treat devices added.
:param details: the details to return for the device
@ -101,14 +118,14 @@ class TestEswitchAgent(base.BaseTestCase):
mock.patch.object(self.agent.plugin_rpc, 'update_device_up'),
mock.patch.object(self.agent, func_name)
) as (vnics_fn, get_dev_fn, upd_dev_up, func):
self.assertFalse(self.agent.treat_devices_added([{}]))
self.assertFalse(self.agent.treat_devices_added_or_updated([{}]))
return (func.called, upd_dev_up.called)
def test_treat_devices_added_updates_known_port(self):
details = mock.MagicMock()
details.__contains__.side_effect = lambda x: True
func, dev_up = self._mock_treat_devices_added(details,
'treat_vif_port')
func, dev_up = self._mock_treat_devices_added_updated(details,
'treat_vif_port')
self.assertTrue(func)
self.assertTrue(dev_up)
@ -120,8 +137,8 @@ class TestEswitchAgent(base.BaseTestCase):
'physical_network': 'default',
'segmentation_id': 2,
'admin_state_up': False}
func, dev_up = self._mock_treat_devices_added(details,
'treat_vif_port')
func, dev_up = self._mock_treat_devices_added_updated(details,
'treat_vif_port')
self.assertTrue(func)
self.assertFalse(dev_up)
@ -139,17 +156,75 @@ class TestEswitchAgent(base.BaseTestCase):
self.assertFalse(self.agent.treat_devices_removed([{}]))
self.assertTrue(port_release.called)
def _test_process_network_ports(self, port_info):
with contextlib.nested(
mock.patch.object(self.agent, 'treat_devices_added_or_updated',
return_value=False),
mock.patch.object(self.agent, 'treat_devices_removed',
return_value=False)
) as (device_added_updated, device_removed):
self.assertFalse(self.agent.process_network_ports(port_info))
device_added_updated.assert_called_once_with(
port_info['added'] | port_info['updated'])
device_removed.assert_called_once_with(port_info['removed'])
def test_process_network_ports(self):
current_ports = set(['01:02:03:04:05:06'])
added_ports = set(['10:20:30:40:50:60'])
removed_ports = set(['11:22:33:44:55:66'])
reply = {'current': current_ports,
'removed': removed_ports,
'added': added_ports}
with mock.patch.object(self.agent, 'treat_devices_added',
return_value=False) as device_added:
with mock.patch.object(self.agent, 'treat_devices_removed',
return_value=False) as device_removed:
self.assertFalse(self.agent.process_network_ports(reply))
device_added.assert_called_once_with(added_ports)
device_removed.assert_called_once_with(removed_ports)
self._test_process_network_ports(
{'current': set(['10:20:30:40:50:60']),
'updated': set(),
'added': set(['11:21:31:41:51:61']),
'removed': set(['13:23:33:43:53:63'])})
def test_process_network_ports_with_updated_ports(self):
self._test_process_network_ports(
{'current': set(['10:20:30:40:50:60']),
'updated': set(['12:22:32:42:52:62']),
'added': set(['11:21:31:41:51:61']),
'removed': set(['13:23:33:43:53:63'])})
def test_add_port_update(self):
mac_addr = '10:20:30:40:50:60'
self.agent.add_port_update(mac_addr)
self.assertEqual(set([mac_addr]), self.agent.updated_ports)
def _mock_scan_ports(self, vif_port_set, registered_ports, updated_ports):
with mock.patch.object(self.agent.eswitch, 'get_vnics_mac',
return_value=vif_port_set):
return self.agent.scan_ports(registered_ports, updated_ports)
def test_scan_ports_return_current_for_unchanged_ports(self):
vif_port_set = set([1, 2])
registered_ports = set([1, 2])
actual = self._mock_scan_ports(vif_port_set,
registered_ports, set())
expected = dict(current=vif_port_set, added=set(),
removed=set(), updated=set())
self.assertEqual(expected, actual)
def test_scan_ports_return_port_changes(self):
vif_port_set = set([1, 3])
registered_ports = set([1, 2])
actual = self._mock_scan_ports(vif_port_set,
registered_ports, set())
expected = dict(current=vif_port_set, added=set([3]),
removed=set([2]), updated=set())
self.assertEqual(expected, actual)
def test_scan_ports_with_updated_ports(self):
vif_port_set = set([1, 3, 4])
registered_ports = set([1, 2, 4])
actual = self._mock_scan_ports(vif_port_set,
registered_ports, set([4]))
expected = dict(current=vif_port_set, added=set([3]),
removed=set([2]), updated=set([4]))
self.assertEqual(expected, actual)
def test_scan_ports_with_unknown_updated_ports(self):
vif_port_set = set([1, 3, 4])
registered_ports = set([1, 2, 4])
actual = self._mock_scan_ports(vif_port_set,
registered_ports,
updated_ports=set([4, 5]))
expected = dict(current=vif_port_set, added=set([3]),
removed=set([2]), updated=set([4]))
self.assertEqual(expected, actual)