Ensure l3 agent receives notification about added router

Currently router_added (and other) notifications are sent
to agents with an RPC cast() method which does not ensure that
the message is actually delivered to the recipient.
If the message is lost (due to instability of messaging system
in some failover scenarios for example) neither server nor agent
will be aware of that and router will be "lost" till next agent
resync. Resync will only happen in case of errors on agent side
or restart.
The fix makes server use call() to notify agents about added routers
thus ensuring no routers will be lost.

This also unifies reschedule_router() method to avoid code duplication
between legacy and dvr agent schedulers.

Closes-Bug: #1482630
Related-Bug #1404743
Change-Id: Id08764ba837d8f47a28649d081a5876797fe369e
This commit is contained in:
Oleg Bondarev 2015-08-07 15:44:29 +03:00
parent 7a9e8175c9
commit 30b121dfa4
5 changed files with 102 additions and 50 deletions

View File

@ -38,13 +38,15 @@ class L3AgentNotifyAPI(object):
target = oslo_messaging.Target(topic=topic, version='1.0')
self.client = n_rpc.get_client(target)
def _notification_host(self, context, method, host, **kwargs):
def _notification_host(self, context, method, host, use_call=False,
**kwargs):
"""Notify the agent that is hosting the router."""
LOG.debug('Notify agent at %(host)s the message '
'%(method)s', {'host': host,
'method': method})
cctxt = self.client.prepare(server=host)
cctxt.cast(context, method, **kwargs)
rpc_method = cctxt.call if use_call else cctxt.cast
rpc_method(context, method, **kwargs)
def _agent_notification(self, context, method, router_ids, operation,
shuffle_agents):
@ -156,8 +158,12 @@ class L3AgentNotifyAPI(object):
payload={'router_id': router_id})
def router_added_to_agent(self, context, router_ids, host):
# need to use call here as we want to be sure agent received
# notification and router will not be "lost". However using call()
# itself is not a guarantee, calling code should handle exceptions and
# retry
self._notification_host(context, 'router_added_to_agent', host,
payload=router_ids)
use_call=True, payload=router_ids)
def routers_updated_on_host(self, context, router_ids, host):
self._notification_host(context, 'routers_updated', host,

View File

@ -55,6 +55,10 @@ L3_AGENTS_SCHEDULER_OPTS = [
cfg.CONF.register_opts(L3_AGENTS_SCHEDULER_OPTS)
# default messaging timeout is 60 sec, so 2 here is chosen to not block API
# call for more than 2 minutes
AGENT_NOTIFY_MAX_ATTEMPTS = 2
class RouterL3AgentBinding(model_base.BASEV2):
"""Represents binding between neutron routers and L3 agents."""
@ -270,7 +274,7 @@ class L3AgentSchedulerDbMixin(l3agentscheduler.L3AgentSchedulerPluginBase,
query.delete()
def reschedule_router(self, context, router_id, candidates=None):
"""Reschedule router to a new l3 agent
"""Reschedule router to (a) new l3 agent(s)
Remove the router from the agent(s) currently hosting it and
schedule it again
@ -281,19 +285,45 @@ class L3AgentSchedulerDbMixin(l3agentscheduler.L3AgentSchedulerPluginBase,
for agent in cur_agents:
self._unbind_router(context, router_id, agent['id'])
new_agent = self.schedule_router(context, router_id,
candidates=candidates)
if not new_agent:
self.schedule_router(context, router_id, candidates=candidates)
new_agents = self.list_l3_agents_hosting_router(
context, router_id)['agents']
if not new_agents:
raise l3agentscheduler.RouterReschedulingFailed(
router_id=router_id)
self._notify_agents_router_rescheduled(context, router_id,
cur_agents, new_agents)
def _notify_agents_router_rescheduled(self, context, router_id,
old_agents, new_agents):
l3_notifier = self.agent_notifiers.get(constants.AGENT_TYPE_L3)
if l3_notifier:
for agent in cur_agents:
l3_notifier.router_removed_from_agent(
context, router_id, agent['host'])
l3_notifier.router_added_to_agent(
context, [router_id], new_agent.host)
if not l3_notifier:
return
old_hosts = [agent['host'] for agent in old_agents]
new_hosts = [agent['host'] for agent in new_agents]
for host in set(old_hosts) - set(new_hosts):
l3_notifier.router_removed_from_agent(
context, router_id, host)
for agent in new_agents:
# Need to make sure agents are notified or unschedule otherwise
for attempt in range(AGENT_NOTIFY_MAX_ATTEMPTS):
try:
l3_notifier.router_added_to_agent(
context, [router_id], agent['host'])
break
except oslo_messaging.MessagingException:
LOG.warning(_LW('Failed to notify L3 agent on host '
'%(host)s about added router. Attempt '
'%(attempt)d out of %(max_attempts)d'),
{'host': agent['host'], 'attempt': attempt + 1,
'max_attempts': AGENT_NOTIFY_MAX_ATTEMPTS})
else:
self._unbind_router(context, router_id, agent['id'])
raise l3agentscheduler.RouterReschedulingFailed(
router_id=router_id)
def list_routers_on_l3_agent(self, context, agent_id):
query = context.session.query(RouterL3AgentBinding.router_id)

View File

@ -382,42 +382,12 @@ class L3_DVRsch_db_mixin(l3agent_sch_db.L3AgentSchedulerDbMixin):
context, router_id, chosen_agent)
return chosen_agent
def reschedule_router(self, context, router_id, candidates=None):
"""Reschedule router to new l3 agents
Remove the router from l3 agents currently hosting it and
schedule it again
"""
def _unbind_router(self, context, router_id, agent_id):
router = self.get_router(context, router_id)
is_distributed = router.get('distributed', False)
if not is_distributed:
return super(L3_DVRsch_db_mixin, self).reschedule_router(
context, router_id, candidates)
old_agents = self.list_l3_agents_hosting_router(
context, router_id)['agents']
with context.session.begin(subtransactions=True):
for agent in old_agents:
self._unbind_router(context, router_id, agent['id'])
self.unbind_snat_servicenode(context, router_id)
self.schedule_router(context, router_id, candidates=candidates)
new_agents = self.list_l3_agents_hosting_router(
context, router_id)['agents']
if not new_agents:
raise l3agentscheduler.RouterReschedulingFailed(
router_id=router_id)
l3_notifier = self.agent_notifiers.get(n_const.AGENT_TYPE_L3)
if l3_notifier:
old_hosts = [agent['host'] for agent in old_agents]
new_hosts = [agent['host'] for agent in new_agents]
for host in set(old_hosts) - set(new_hosts):
l3_notifier.router_removed_from_agent(
context, router_id, host)
for host in new_hosts:
l3_notifier.router_added_to_agent(
context, [router_id], host)
super(L3_DVRsch_db_mixin, self)._unbind_router(context, router_id,
agent_id)
if router.get('distributed', False):
self.unbind_snat(context, router_id, agent_id)
def _get_active_l3_agent_routers_sync_data(self, context, host, agent,
router_ids):

View File

@ -229,6 +229,7 @@ class OvsAgentSchedulerTestCaseBase(test_l3.L3NatTestCaseMixin,
service_plugins = {'l3_plugin_name': self.l3_plugin}
else:
service_plugins = None
mock.patch('neutron.common.rpc.get_client').start()
super(OvsAgentSchedulerTestCaseBase, self).setUp(
self.plugin_str, service_plugins=service_plugins)
ext_mgr = extensions.PluginAwareExtensionManager.get_instance()
@ -773,6 +774,49 @@ class OvsAgentSchedulerTestCase(OvsAgentSchedulerTestCaseBase):
self.assertIn(dvr_agent['host'],
[a['host'] for a in agents['agents']])
def test_router_reschedule_succeeded_after_failed_notification(self):
l3_plugin = (manager.NeutronManager.get_service_plugins()
[service_constants.L3_ROUTER_NAT])
l3_notifier = l3_plugin.agent_notifiers[constants.AGENT_TYPE_L3]
l3_rpc_cb = l3_rpc.L3RpcCallback()
self._register_agent_states()
with self.router() as router:
# schedule the router to host A
l3_rpc_cb.sync_routers(self.adminContext, host=L3_HOSTA)
with mock.patch.object(
l3_notifier, 'router_added_to_agent') as notification_mock:
notification_mock.side_effect = [
oslo_messaging.MessagingTimeout, None]
self._take_down_agent_and_run_reschedule(L3_HOSTA)
self.assertEqual(
2, l3_notifier.router_added_to_agent.call_count)
# make sure router was rescheduled even when first attempt
# failed to notify l3 agent
l3_agents = self._list_l3_agents_hosting_router(
router['router']['id'])['agents']
self.assertEqual(1, len(l3_agents))
self.assertEqual(L3_HOSTB, l3_agents[0]['host'])
def test_router_reschedule_failed_notification_all_attempts(self):
l3_plugin = (manager.NeutronManager.get_service_plugins()
[service_constants.L3_ROUTER_NAT])
l3_notifier = l3_plugin.agent_notifiers[constants.AGENT_TYPE_L3]
l3_rpc_cb = l3_rpc.L3RpcCallback()
self._register_agent_states()
with self.router() as router:
# schedule the router to host A
l3_rpc_cb.sync_routers(self.adminContext, host=L3_HOSTA)
with mock.patch.object(
l3_notifier, 'router_added_to_agent') as notification_mock:
notification_mock.side_effect = oslo_messaging.MessagingTimeout
self._take_down_agent_and_run_reschedule(L3_HOSTA)
self.assertEqual(
l3_agentschedulers_db.AGENT_NOTIFY_MAX_ATTEMPTS,
l3_notifier.router_added_to_agent.call_count)
l3_agents = self._list_l3_agents_hosting_router(
router['router']['id'])['agents']
self.assertEqual(0, len(l3_agents))
def test_router_auto_schedule_with_invalid_router(self):
with self.router() as router:
l3_rpc_cb = l3_rpc.L3RpcCallback()
@ -1494,7 +1538,7 @@ class OvsL3AgentNotifierTestCase(test_l3.L3NatTestCaseMixin,
l3_notifier.client,
'prepare',
return_value=l3_notifier.client) as mock_prepare,\
mock.patch.object(l3_notifier.client, 'cast') as mock_cast,\
mock.patch.object(l3_notifier.client, 'call') as mock_call,\
self.router() as router1:
self._register_agent_states()
hosta_id = self._get_agent_id(constants.AGENT_TYPE_L3,
@ -1503,7 +1547,7 @@ class OvsL3AgentNotifierTestCase(test_l3.L3NatTestCaseMixin,
router1['router']['id'])
routers = [router1['router']['id']]
mock_prepare.assert_called_with(server='hosta')
mock_cast.assert_called_with(
mock_call.assert_called_with(
mock.ANY, 'router_added_to_agent', payload=routers)
notifications = fake_notifier.NOTIFICATIONS
expected_event_type = 'l3_agent.router.add'
@ -1518,6 +1562,7 @@ class OvsL3AgentNotifierTestCase(test_l3.L3NatTestCaseMixin,
'prepare',
return_value=l3_notifier.client) as mock_prepare,\
mock.patch.object(l3_notifier.client, 'cast') as mock_cast,\
mock.patch.object(l3_notifier.client, 'call'),\
self.router() as router1:
self._register_agent_states()
hosta_id = self._get_agent_id(constants.AGENT_TYPE_L3,

View File

@ -1646,6 +1646,7 @@ class L3HATestCaseMixin(testlib_api.SqlTestCase,
super(L3HATestCaseMixin, self).setUp()
self.adminContext = n_context.get_admin_context()
mock.patch('neutron.common.rpc.get_client').start()
self.plugin = L3HAPlugin()
self.setup_coreplugin('neutron.plugins.ml2.plugin.Ml2Plugin')