Move check_ha_state_for_router() into notification code
As soon as we call router_info.initialize(), we could
possibly try and process a router. If it is HA, and
we have not fully initialized the HA port or keepalived
manager, we could trigger an exception.
Move the call to check_ha_state_for_router() into the
update notification code so it's done after the router
has been created. Updated the functional tests for this
since the unit tests are now invalid.
Also added a retry counter to the RouterUpdate object so
the l3-agent code will stop re-enqueuing the same update
in an infinite loop. We will delete the router if the
limit is reached.
Finally, have the L3 HA code verify that ha_port and
keepalived_manager objects are valid during deletion since
there is no need to do additional work if they are not.
Change-Id: Iae65305cbc04b7af482032ddf06b6f2162a9c862
Closes-bug: #1726370
(cherry picked from commit d2b909f533
)
This commit is contained in:
parent
bd64409bbb
commit
6809a6dd77
|
@ -483,6 +483,13 @@ class L3NATAgent(ha.AgentMixin,
|
|||
self.l3_ext_manager.add_router(self.context, router)
|
||||
|
||||
def _process_updated_router(self, router):
|
||||
is_dvr_only_agent = (self.conf.agent_mode in
|
||||
[lib_const.L3_AGENT_MODE_DVR,
|
||||
l3_constants.L3_AGENT_MODE_DVR_NO_EXTERNAL])
|
||||
# For HA routers check that DB state matches actual state
|
||||
if router.get('ha') and not is_dvr_only_agent:
|
||||
self.check_ha_state_for_router(
|
||||
router['id'], router.get(l3_constants.HA_ROUTER_STATE_KEY))
|
||||
ri = self.router_info[router['id']]
|
||||
ri.router = router
|
||||
registry.notify(resources.ROUTER, events.BEFORE_UPDATE,
|
||||
|
@ -493,6 +500,14 @@ class L3NATAgent(ha.AgentMixin,
|
|||
|
||||
def _resync_router(self, router_update,
|
||||
priority=queue.PRIORITY_SYNC_ROUTERS_TASK):
|
||||
# Don't keep trying to resync if it's failing
|
||||
if router_update.hit_retry_limit():
|
||||
LOG.warning("Hit retry limit with router update for %s, action %s",
|
||||
router_update.id, router_update.action)
|
||||
if router_update.action != queue.DELETE_ROUTER:
|
||||
LOG.debug("Deleting router %s", router_update.id)
|
||||
self._safe_router_removed(router_update.id)
|
||||
return
|
||||
router_update.timestamp = timeutils.utcnow()
|
||||
router_update.priority = priority
|
||||
router_update.router = None # Force the agent to resync the router
|
||||
|
@ -591,9 +606,6 @@ class L3NATAgent(ha.AgentMixin,
|
|||
chunk = []
|
||||
is_snat_agent = (self.conf.agent_mode ==
|
||||
lib_const.L3_AGENT_MODE_DVR_SNAT)
|
||||
is_dvr_only_agent = (self.conf.agent_mode in
|
||||
[lib_const.L3_AGENT_MODE_DVR,
|
||||
l3_constants.L3_AGENT_MODE_DVR_NO_EXTERNAL])
|
||||
try:
|
||||
router_ids = self.plugin_rpc.get_router_ids(context)
|
||||
# We set HA network port status to DOWN to let l2 agent update it
|
||||
|
@ -617,10 +629,6 @@ class L3NATAgent(ha.AgentMixin,
|
|||
ns_manager.keep_ext_net(ext_net_id)
|
||||
elif is_snat_agent and not r.get('ha'):
|
||||
ns_manager.ensure_snat_cleanup(r['id'])
|
||||
# For HA routers check that DB state matches actual state
|
||||
if r.get('ha') and not is_dvr_only_agent:
|
||||
self.check_ha_state_for_router(
|
||||
r['id'], r.get(l3_constants.HA_ROUTER_STATE_KEY))
|
||||
update = queue.RouterUpdate(
|
||||
r['id'],
|
||||
queue.PRIORITY_SYNC_ROUTERS_TASK,
|
||||
|
|
|
@ -163,6 +163,10 @@ class HaRouter(router.RouterInfo):
|
|||
self.keepalived_manager.spawn()
|
||||
|
||||
def disable_keepalived(self):
|
||||
if not self.keepalived_manager:
|
||||
LOG.debug('Error while disabling keepalived for %s - no manager',
|
||||
self.router_id)
|
||||
return
|
||||
self.keepalived_manager.disable()
|
||||
conf_dir = self.keepalived_manager.get_conf_dir()
|
||||
shutil.rmtree(conf_dir)
|
||||
|
@ -192,6 +196,10 @@ class HaRouter(router.RouterInfo):
|
|||
preserve_ips=[self._get_primary_vip()])
|
||||
|
||||
def ha_network_removed(self):
|
||||
if not self.ha_port:
|
||||
LOG.debug('Error while removing HA network for %s - no port',
|
||||
self.router_id)
|
||||
return
|
||||
self.driver.unplug(self.get_ha_device_name(),
|
||||
namespace=self.ha_namespace,
|
||||
prefix=HA_DEV_PREFIX)
|
||||
|
@ -362,6 +370,10 @@ class HaRouter(router.RouterInfo):
|
|||
self.router_id, IP_MONITOR_PROCESS_SERVICE, pm)
|
||||
|
||||
def destroy_state_change_monitor(self, process_monitor):
|
||||
if not self.ha_port:
|
||||
LOG.debug('Error while destroying state change monitor for %s - '
|
||||
'no port', self.router_id)
|
||||
return
|
||||
pm = self._get_state_change_monitor_process_manager()
|
||||
process_monitor.unregister(
|
||||
self.router_id, IP_MONITOR_PROCESS_SERVICE)
|
||||
|
|
|
@ -33,7 +33,7 @@ class RouterUpdate(object):
|
|||
and process a request to update a router.
|
||||
"""
|
||||
def __init__(self, router_id, priority,
|
||||
action=None, router=None, timestamp=None):
|
||||
action=None, router=None, timestamp=None, tries=5):
|
||||
self.priority = priority
|
||||
self.timestamp = timestamp
|
||||
if not timestamp:
|
||||
|
@ -41,6 +41,7 @@ class RouterUpdate(object):
|
|||
self.id = router_id
|
||||
self.action = action
|
||||
self.router = router
|
||||
self.tries = tries
|
||||
|
||||
def __lt__(self, other):
|
||||
"""Implements priority among updates
|
||||
|
@ -57,6 +58,9 @@ class RouterUpdate(object):
|
|||
return self.timestamp < other.timestamp
|
||||
return self.id < other.id
|
||||
|
||||
def hit_retry_limit(self):
|
||||
return self.tries < 0
|
||||
|
||||
|
||||
class ExclusiveRouterProcessor(object):
|
||||
"""Manager for access to a router for processing
|
||||
|
@ -143,6 +147,7 @@ class RouterProcessingQueue(object):
|
|||
self._queue = Queue.PriorityQueue()
|
||||
|
||||
def add(self, update):
|
||||
update.tries -= 1
|
||||
self._queue.put(update)
|
||||
|
||||
def each_update_to_next_router(self):
|
||||
|
|
|
@ -32,7 +32,7 @@ from neutron.tests.functional.agent.l3 import framework
|
|||
|
||||
class L3AgentTestCase(framework.L3AgentTestFramework):
|
||||
|
||||
def test_agent_notifications_for_router_events(self):
|
||||
def _test_agent_notifications_for_router_events(self, enable_ha=False):
|
||||
"""Test notifications for router create, update, and delete.
|
||||
|
||||
Make sure that when the agent sends notifications of router events
|
||||
|
@ -53,10 +53,14 @@ class L3AgentTestCase(framework.L3AgentTestFramework):
|
|||
registry.subscribe(event_handler,
|
||||
resources.ROUTER, events.AFTER_DELETE)
|
||||
|
||||
router_info = self.generate_router_info(enable_ha=False)
|
||||
router_info = self.generate_router_info(enable_ha=enable_ha)
|
||||
router = self.manage_router(self.agent, router_info)
|
||||
with mock.patch.object(self.agent,
|
||||
'check_ha_state_for_router') as check:
|
||||
self.agent._process_updated_router(router.router)
|
||||
self._delete_router(self.agent, router.router_id)
|
||||
if enable_ha:
|
||||
check.assert_called_once_with(router.router_id, None)
|
||||
|
||||
expected_calls = [
|
||||
mock.call('router', 'before_create', self.agent, router=router),
|
||||
|
@ -67,6 +71,12 @@ class L3AgentTestCase(framework.L3AgentTestFramework):
|
|||
mock.call('router', 'after_delete', self.agent, router=router)]
|
||||
event_handler.assert_has_calls(expected_calls)
|
||||
|
||||
def test_agent_notifications_for_router_events(self):
|
||||
self._test_agent_notifications_for_router_events()
|
||||
|
||||
def test_agent_notifications_for_router_events_ha(self):
|
||||
self._test_agent_notifications_for_router_events(enable_ha=True)
|
||||
|
||||
def test_legacy_router_update_floatingip_statuses(self):
|
||||
self._test_update_floatingip_statuses(
|
||||
self.generate_router_info(enable_ha=False))
|
||||
|
|
|
@ -262,39 +262,6 @@ class TestBasicRouterOperations(BasicRouterOperationsFramework):
|
|||
n_const.HA_ROUTER_STATE_STANDBY)
|
||||
queue_event.assert_not_called()
|
||||
|
||||
def test_periodic_sync_routers_task_call_check_ha_state_for_router(self):
|
||||
agent = l3_agent.L3NATAgentWithStateReport(HOSTNAME, self.conf)
|
||||
ha_id = _uuid()
|
||||
active_routers = [
|
||||
{'id': ha_id,
|
||||
n_const.HA_ROUTER_STATE_KEY: n_const.HA_ROUTER_STATE_STANDBY,
|
||||
'ha': True},
|
||||
{'id': _uuid()}]
|
||||
self.plugin_api.get_router_ids.return_value = [r['id'] for r
|
||||
in active_routers]
|
||||
self.plugin_api.get_routers.return_value = active_routers
|
||||
with mock.patch.object(agent, 'check_ha_state_for_router') as check:
|
||||
agent.periodic_sync_routers_task(agent.context)
|
||||
check.assert_called_once_with(ha_id,
|
||||
n_const.HA_ROUTER_STATE_STANDBY)
|
||||
|
||||
def test_periodic_sync_routers_task_not_check_ha_state_for_router(self):
|
||||
# DVR-only agent should not trigger ha state check
|
||||
self.conf.set_override('agent_mode', lib_constants.L3_AGENT_MODE_DVR)
|
||||
agent = l3_agent.L3NATAgentWithStateReport(HOSTNAME, self.conf)
|
||||
ha_id = _uuid()
|
||||
active_routers = [
|
||||
{'id': ha_id,
|
||||
n_const.HA_ROUTER_STATE_KEY: n_const.HA_ROUTER_STATE_STANDBY,
|
||||
'ha': True},
|
||||
{'id': _uuid()}]
|
||||
self.plugin_api.get_router_ids.return_value = [r['id'] for r
|
||||
in active_routers]
|
||||
self.plugin_api.get_routers.return_value = active_routers
|
||||
with mock.patch.object(agent, 'check_ha_state_for_router') as check:
|
||||
agent.periodic_sync_routers_task(agent.context)
|
||||
self.assertFalse(check.called)
|
||||
|
||||
def test_periodic_sync_routers_task_raise_exception(self):
|
||||
agent = l3_agent.L3NATAgent(HOSTNAME, self.conf)
|
||||
self.plugin_api.get_router_ids.return_value = ['fake_id']
|
||||
|
|
|
@ -90,6 +90,8 @@ class TestBasicRouterOperations(base.BaseTestCase):
|
|||
|
||||
def test_destroy_state_change_monitor_ok(self):
|
||||
ri = self._create_router(mock.MagicMock())
|
||||
# need a port for destroy_state_change_monitor() to call PM code
|
||||
ri.ha_port = {'id': _uuid()}
|
||||
with mock.patch.object(ri,
|
||||
'_get_state_change_monitor_process_manager')\
|
||||
as m_get_state:
|
||||
|
@ -102,6 +104,8 @@ class TestBasicRouterOperations(base.BaseTestCase):
|
|||
|
||||
def test_destroy_state_change_monitor_force(self):
|
||||
ri = self._create_router(mock.MagicMock())
|
||||
# need a port for destroy_state_change_monitor() to call PM code
|
||||
ri.ha_port = {'id': _uuid()}
|
||||
with mock.patch.object(ri,
|
||||
'_get_state_change_monitor_process_manager')\
|
||||
as m_get_state:
|
||||
|
|
|
@ -101,3 +101,13 @@ class TestExclusiveRouterProcessor(base.BaseTestCase):
|
|||
raise Exception("Only the master should process a router")
|
||||
|
||||
self.assertEqual(2, len([i for i in master.updates()]))
|
||||
|
||||
def test_hit_retry_limit(self):
|
||||
tries = 1
|
||||
queue = l3_queue.RouterProcessingQueue()
|
||||
update = l3_queue.RouterUpdate(FAKE_ID, l3_queue.PRIORITY_RPC,
|
||||
tries=tries)
|
||||
queue.add(update)
|
||||
self.assertFalse(update.hit_retry_limit())
|
||||
queue.add(update)
|
||||
self.assertTrue(update.hit_retry_limit())
|
||||
|
|
Loading…
Reference in New Issue