NEC plugin: Fix nec-agent unexpected exit due to RPC exception

Changes nec-agent to catch RPC exceptions in the main
daemon loop. It fixes the following two bugs.

- nec-agent dies with RPC timeout in secgroup RPC
  (Closes-Bug: #1235106)

- port_added message can be dropped when RPC timeout occurs
  (Closes-Bug: #1235111). Moves exception catch from PluginApi
  to the main daemon loop.

Change-Id: I12394b739fc35bff271543f4625c5e3152e534ae
This commit is contained in:
Akihiro MOTOKI 2013-10-04 15:53:03 +09:00 committed by Gerrit Code Review
parent 45f3dccef1
commit 3df505e448
2 changed files with 88 additions and 19 deletions

View File

@ -54,17 +54,13 @@ class NECPluginApi(agent_rpc.PluginApi):
LOG.info(_("Update ports: added=%(added)s, "
"removed=%(removed)s"),
{'added': port_added, 'removed': port_removed})
try:
self.call(context,
self.make_msg('update_ports',
topic=topics.AGENT,
agent_id=agent_id,
datapath_id=datapath_id,
port_added=port_added,
port_removed=port_removed))
except Exception:
LOG.warn(_("update_ports() failed."))
return
self.call(context,
self.make_msg('update_ports',
topic=topics.AGENT,
agent_id=agent_id,
datapath_id=datapath_id,
port_added=port_added,
port_removed=port_removed))
class NECAgentRpcCallback(object):
@ -126,6 +122,7 @@ class NECNeutronAgent(object):
self.int_br = ovs_lib.OVSBridge(integ_br, root_helper)
self.polling_interval = polling_interval
self.cur_ports = []
self.need_sync = True
self.datapath_id = "0x%s" % self.int_br.get_datapath_id()
@ -194,21 +191,22 @@ class NECNeutronAgent(object):
if port_removed:
self.sg_agent.remove_devices_filter(port_removed)
def daemon_loop(self):
"""Main processing loop for NEC Plugin Agent."""
while True:
def loop_handler(self):
try:
# self.cur_ports will be kept until loop_handler succeeds.
cur_ports = [] if self.need_sync else self.cur_ports
new_ports = []
port_added = []
for vif_port in self.int_br.get_vif_ports():
port_id = vif_port.vif_id
new_ports.append(port_id)
if port_id not in self.cur_ports:
if port_id not in cur_ports:
port_info = self._vif_port_to_port_info(vif_port)
port_added.append(port_info)
port_removed = []
for port_id in self.cur_ports:
for port_id in cur_ports:
if port_id not in new_ports:
port_removed.append(port_id)
@ -221,6 +219,15 @@ class NECNeutronAgent(object):
LOG.debug(_("No port changed."))
self.cur_ports = new_ports
self.need_sync = False
except Exception:
LOG.exception(_("Error in agent event loop"))
self.need_sync = True
def daemon_loop(self):
"""Main processing loop for NEC Plugin Agent."""
while True:
self.loop_handler()
time.sleep(self.polling_interval)

View File

@ -61,6 +61,71 @@ class TestNecAgentBase(base.BaseTestCase):
class TestNecAgent(TestNecAgentBase):
def _setup_mock(self):
vif_ports = [ovs_lib.VifPort('port1', '1', 'id-1', 'mac-1',
self.agent.int_br),
ovs_lib.VifPort('port2', '2', 'id-2', 'mac-2',
self.agent.int_br)]
self.get_vif_ports = mock.patch.object(
ovs_lib.OVSBridge, 'get_vif_ports',
return_value=vif_ports).start()
self.update_ports = mock.patch.object(
nec_neutron_agent.NECPluginApi, 'update_ports').start()
self.prepare_devices_filter = mock.patch.object(
self.agent.sg_agent, 'prepare_devices_filter').start()
self.remove_devices_filter = mock.patch.object(
self.agent.sg_agent, 'remove_devices_filter').start()
def _test_single_loop(self, with_exc=False, need_sync=False):
self.agent.cur_ports = ['id-0', 'id-1']
self.agent.need_sync = need_sync
self.agent.loop_handler()
if with_exc:
self.assertEqual(self.agent.cur_ports, ['id-0', 'id-1'])
self.assertTrue(self.agent.need_sync)
else:
self.assertEqual(self.agent.cur_ports, ['id-1', 'id-2'])
self.assertFalse(self.agent.need_sync)
def test_single_loop_normal(self):
self._setup_mock()
self._test_single_loop()
agent_id = 'nec-q-agent.dummy-host'
self.update_ports.assert_called_once_with(
mock.ANY, agent_id, OVS_DPID_0X,
[{'id': 'id-2', 'mac': 'mac-2', 'port_no': '2'}],
['id-0'])
self.prepare_devices_filter.assert_called_once_with(['id-2'])
self.remove_devices_filter.assert_called_once_with(['id-0'])
def test_single_loop_need_sync(self):
self._setup_mock()
self._test_single_loop(need_sync=True)
agent_id = 'nec-q-agent.dummy-host'
self.update_ports.assert_called_once_with(
mock.ANY, agent_id, OVS_DPID_0X,
[{'id': 'id-1', 'mac': 'mac-1', 'port_no': '1'},
{'id': 'id-2', 'mac': 'mac-2', 'port_no': '2'}],
[])
self.prepare_devices_filter.assert_called_once_with(['id-1', 'id-2'])
self.assertFalse(self.remove_devices_filter.call_count)
def test_single_loop_with_sg_exception_remove(self):
self._setup_mock()
self.update_ports.side_effect = Exception()
self._test_single_loop(with_exc=True)
def test_single_loop_with_sg_exception_prepare(self):
self._setup_mock()
self.prepare_devices_filter.side_effect = Exception()
self._test_single_loop(with_exc=True)
def test_single_loop_with_update_ports_exception(self):
self._setup_mock()
self.remove_devices_filter.side_effect = Exception()
self._test_single_loop(with_exc=True)
def test_daemon_loop(self):
def state_check(index):
@ -278,9 +343,6 @@ class TestNecAgentPluginApi(TestNecAgentBase):
def test_plugin_api(self):
self._test_plugin_api()
def test_plugin_api_fail(self):
self._test_plugin_api(expected_failure=True)
class TestNecAgentMain(base.BaseTestCase):
def test_main(self):